robertwb commented on code in PR #22308:
URL: https://github.com/apache/beam/pull/22308#discussion_r926110124


##########
sdks/python/apache_beam/pipeline.py:
##########
@@ -919,11 +918,49 @@ def visit_transform(self, transform_node):
         requirements=context.requirements())
     proto.components.transforms[root_transform_id].unique_name = (
         root_transform_id)
+    self.merge_compatible_environments(proto)
     if return_context:
       return proto, context  # type: ignore  # too complicated for now
     else:
       return proto
 
+  @staticmethod
+  def merge_compatible_environments(proto):
+    """Tries to minimize the number of distinct environments by merging
+    those that are compatible (currently defined as identical).
+
+    Mutates proto as contexts may have references to proto.components.
+    """
+    env_map = {}
+    canonical_env = {}
+    files_by_hash = {}
+    for env_id, env in proto.components.environments.items():
+      # First deduplicate any file dependencies by their hash.
+      for dep in env.dependencies:
+        if dep.type_urn == common_urns.artifact_types.FILE.urn:
+          file_payload = beam_runner_api_pb2.ArtifactFilePayload.FromString(
+              dep.type_payload)
+          if file_payload.sha256:
+            if file_payload.sha256 in files_by_hash:
+              file_payload.path = files_by_hash[file_payload.sha256]

Review Comment:
   Yeah. We could probably clean that up a lot once Runner v1 goes away. 



##########
sdks/python/apache_beam/pipeline.py:
##########
@@ -919,11 +918,49 @@ def visit_transform(self, transform_node):
         requirements=context.requirements())
     proto.components.transforms[root_transform_id].unique_name = (
         root_transform_id)
+    self.merge_compatible_environments(proto)
     if return_context:
       return proto, context  # type: ignore  # too complicated for now
     else:
       return proto
 
+  @staticmethod
+  def merge_compatible_environments(proto):
+    """Tries to minimize the number of distinct environments by merging
+    those that are compatible (currently defined as identical).
+
+    Mutates proto as contexts may have references to proto.components.
+    """
+    env_map = {}
+    canonical_env = {}
+    files_by_hash = {}
+    for env_id, env in proto.components.environments.items():
+      # First deduplicate any file dependencies by their hash.
+      for dep in env.dependencies:

Review Comment:
   It's not really well defined whether dependencies are ordered (e.g. the 
order of jar files matters for class resolution) so I'm preserving the order 
here just in case.



##########
sdks/python/apache_beam/pipeline_test.py:
##########
@@ -1291,6 +1291,56 @@ def CompositeTransform(pcoll):
         count += 1
     assert count == 2
 
+  def test_environments_are_deduplicated(self):
+    def file_artifact(path, hash, staged_name):
+      return beam_runner_api_pb2.ArtifactInformation(
+          type_urn=common_urns.artifact_types.FILE.urn,
+          type_payload=beam_runner_api_pb2.ArtifactFilePayload(
+              path=path, sha256=hash).SerializeToString(),
+          role_urn=common_urns.artifact_roles.STAGING_TO.urn,
+          role_payload=beam_runner_api_pb2.ArtifactStagingToRolePayload(
+              staged_name=staged_name).SerializeToString(),
+      )
+
+    proto = beam_runner_api_pb2.Pipeline(
+        components=beam_runner_api_pb2.Components(
+            transforms={
+                'transform1': beam_runner_api_pb2.PTransform(
+                    environment_id='e1'),
+                'transform2': beam_runner_api_pb2.PTransform(
+                    environment_id='e2'),
+                'transform3': beam_runner_api_pb2.PTransform(
+                    environment_id='e3'),
+                'transform4': beam_runner_api_pb2.PTransform(
+                    environment_id='e4'),
+            },
+            environments={
+                'e1': beam_runner_api_pb2.Environment(
+                    dependencies=[file_artifact('a1', 'x', 'a')]),
+                'e2': beam_runner_api_pb2.Environment(
+                    dependencies=[file_artifact('a2', 'x', 'a')]),
+                'e3': beam_runner_api_pb2.Environment(
+                    dependencies=[file_artifact('a3', 'y', 'a')]),

Review Comment:
   Done.



##########
sdks/python/apache_beam/pipeline_test.py:
##########
@@ -1291,6 +1291,56 @@ def CompositeTransform(pcoll):
         count += 1
     assert count == 2
 
+  def test_environments_are_deduplicated(self):
+    def file_artifact(path, hash, staged_name):
+      return beam_runner_api_pb2.ArtifactInformation(
+          type_urn=common_urns.artifact_types.FILE.urn,
+          type_payload=beam_runner_api_pb2.ArtifactFilePayload(
+              path=path, sha256=hash).SerializeToString(),
+          role_urn=common_urns.artifact_roles.STAGING_TO.urn,
+          role_payload=beam_runner_api_pb2.ArtifactStagingToRolePayload(
+              staged_name=staged_name).SerializeToString(),
+      )
+
+    proto = beam_runner_api_pb2.Pipeline(
+        components=beam_runner_api_pb2.Components(
+            transforms={
+                'transform1': beam_runner_api_pb2.PTransform(
+                    environment_id='e1'),
+                'transform2': beam_runner_api_pb2.PTransform(
+                    environment_id='e2'),
+                'transform3': beam_runner_api_pb2.PTransform(
+                    environment_id='e3'),
+                'transform4': beam_runner_api_pb2.PTransform(
+                    environment_id='e4'),
+            },
+            environments={
+                'e1': beam_runner_api_pb2.Environment(
+                    dependencies=[file_artifact('a1', 'x', 'a')]),

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to