chamikaramj commented on a change in pull request #13884:
URL: https://github.com/apache/beam/pull/13884#discussion_r569635579



##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -552,6 +511,38 @@ def run_pipeline(self, pipeline, options):
     self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
         return_context=True, default_environment=self._default_environment)
 
+    # Optimize the pipeline if it not streaming and the pre_optimize
+    # experiment is set.
+    pre_optimize = options.view_as(DebugOptions).lookup_experiment(
+        'pre_optimize', 'default').lower()
+    from apache_beam.runners.portability.fn_api_runner import translations
+    if (options.view_as(StandardOptions).streaming or pre_optimize == 'none' or
+        pre_optimize == 'default'):
+      phases = []
+    elif pre_optimize == 'all':
+      phases = [
+          translations.eliminate_common_key_with_none,
+          # TODO(BEAM-11694): Enable translations.pack_combiners
+          # translations.pack_combiners,
+          translations.sort_stages
+      ]
+    else:
+      phases = []
+      for phase_name in pre_optimize.split(','):
+        # For now, these are all we allow.
+        if phase_name in ('eliminate_common_key_with_none', 'pack_combiners'):

Review comment:
       Why is "eliminate_common_key_with_none" rejected here but enable for 
'all' case above ?

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -552,6 +511,38 @@ def run_pipeline(self, pipeline, options):
     self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
         return_context=True, default_environment=self._default_environment)
 
+    # Optimize the pipeline if it not streaming and the pre_optimize
+    # experiment is set.
+    pre_optimize = options.view_as(DebugOptions).lookup_experiment(
+        'pre_optimize', 'default').lower()
+    from apache_beam.runners.portability.fn_api_runner import translations
+    if (options.view_as(StandardOptions).streaming or pre_optimize == 'none' or
+        pre_optimize == 'default'):
+      phases = []
+    elif pre_optimize == 'all':
+      phases = [
+          translations.eliminate_common_key_with_none,
+          # TODO(BEAM-11694): Enable translations.pack_combiners
+          # translations.pack_combiners,
+          translations.sort_stages
+      ]
+    else:
+      phases = []
+      for phase_name in pre_optimize.split(','):
+        # For now, these are all we allow.
+        if phase_name in ('eliminate_common_key_with_none', 'pack_combiners'):
+          phases.append(getattr(translations, phase_name))
+        else:
+          raise ValueError(
+              'Unknown or inapplicable phase for pre_optimize: %s' % 
phase_name)
+      phases.append(translations.sort_stages)
+
+    self.proto_pipeline = translations.optimize_pipeline(

Review comment:
       Note that any optimization here that uses "pipeline -> from_runner_api 
-> to_runner_api" trick will break for x-lang. (inspecting the code though 
seems like only combiner packing used that though so probably OK).

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -552,6 +511,38 @@ def run_pipeline(self, pipeline, options):
     self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
         return_context=True, default_environment=self._default_environment)
 
+    # Optimize the pipeline if it not streaming and the pre_optimize
+    # experiment is set.
+    pre_optimize = options.view_as(DebugOptions).lookup_experiment(
+        'pre_optimize', 'default').lower()
+    from apache_beam.runners.portability.fn_api_runner import translations
+    if (options.view_as(StandardOptions).streaming or pre_optimize == 'none' or
+        pre_optimize == 'default'):
+      phases = []
+    elif pre_optimize == 'all':
+      phases = [
+          translations.eliminate_common_key_with_none,

Review comment:
       Do we already  have tests for optimizations enabled here ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to