tvalentyn commented on code in PR #28564:
URL: https://github.com/apache/beam/pull/28564#discussion_r1433809471


##########
sdks/python/apache_beam/runners/portability/expansion_service.py:
##########
@@ -128,3 +130,13 @@ def with_pipeline(component, pcoll_id=None):
     except Exception:  # pylint: disable=broad-except
       return beam_expansion_api_pb2.ExpansionResponse(
           error=traceback.format_exc())
+
+  def artifact_service(self):
+    return artifact_service.ArtifactRetrievalService(file_reader)

Review Comment:
   I think you can use this file_reader: 
      `artifact_service.BeamFilesystemHandler(None).file_reader` instead of 
reimplementing.
   
   Also let's CC Cham to review this bit after we iron out other issues.
   



##########
sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py:
##########
@@ -211,15 +211,13 @@ def 
test_environment_override_translation_legacy_worker_harness_image(self):
           p | ptransform.Create([1, 2, 3])
           | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)])
           | ptransform.GroupByKey())
-    self.assertEqual(
-        list(remote_runner.proto_pipeline.components.environments.values()),
-        [
-            beam_runner_api_pb2.Environment(
-                urn=common_urns.environments.DOCKER.urn,
-                payload=beam_runner_api_pb2.DockerPayload(
-                    container_image='LEGACY').SerializeToString(),
-                capabilities=environments.python_sdk_docker_capabilities())
-        ])
+    first = remote_runner.proto_pipeline.components.environments.values()
+    second = beam_runner_api_pb2.Environment(
+        urn=common_urns.environments.DOCKER.urn,
+        payload=beam_runner_api_pb2.DockerPayload(
+            container_image='LEGACY').SerializeToString(),
+        capabilities=environments.python_sdk_docker_capabilities())
+    self.assertTrue(first.__eq__(second))

Review Comment:
   could we phrase it with `self.assertEqual(expected, actual)`? why did we 
have to change the helper?



##########
sdks/python/apache_beam/runners/portability/stager.py:
##########
@@ -850,3 +861,40 @@ def _create_beam_sdk(sdk_remote_location, temp_dir):
     return [
         Stager._create_file_stage_to_artifact(local_download_file, staged_name)
     ]
+
+  @staticmethod
+  def _create_stage_submission_env_dependencies(temp_dir):
+    """Create and stage a file with list of dependencies installed in the
+    submission environment.
+
+    This staged file is used at runtime to compare the dependencies in the

Review Comment:
   ```suggestion
       This list can be used at runtime to compare against the dependencies in 
the
   ```



##########
sdks/python/apache_beam/transforms/environments.py:
##########
@@ -125,9 +125,17 @@ def __init__(self,
         dict(resource_hints) if resource_hints else {})
 
   def __eq__(self, other):
+    equal_artifacts = True
+    for first, second in zip(self._artifacts, other._artifacts):
+      # do not compare type payload since it contains a unique hash.

Review Comment:
   This comparison is brittle because:
   
   1) we rely on a particular ordering of self._artifacts. It sounds like the 
reason you are changing it is that you have two essentially equal environments 
that include an artifact with identical payload/sha256 but different source 
file location. The source file location is affected by temp folder that 
introduces randomness. This randomness will be reflected in 
`artifact.SerializeToString()`, which is used as a key to `sort 
self._artifacts` (line 121). Because of that, there is no guarantee that when 
you iterate over `self._artifacts` and `other._artifacts`, multiple artifacts 
will be ordered in a consistent manner.
   2) ignoring entire type.payload might backfire. I would rather use a 
different key for staged files, than `x.SerializeToString()`. For local files, 
it could be evaluated based on sha256 and destination name but ignore the 
source location. Then, you can use such key when sorting artifacts, and when 
comparing them. Having such key would mean that we'd have to decode 
ArtifactInformation, and potentially compare different artifact types/roles 
differently. We could still fall back to `x.SerializeToString()`.
   
   Alternatively you could adjust requirement generation to use the same temp 
directory for generating the pip freeze over the course of the python process 
or the test. I think we could do something to prevent the issue you are dealing 
with here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to