yifanmai commented on a change in pull request #13884:
URL: https://github.com/apache/beam/pull/13884#discussion_r570556744
##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -552,6 +511,38 @@ def run_pipeline(self, pipeline, options):
self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
return_context=True, default_environment=self._default_environment)
+ # Optimize the pipeline if it not streaming and the pre_optimize
+ # experiment is set.
+ pre_optimize = options.view_as(DebugOptions).lookup_experiment(
+ 'pre_optimize', 'default').lower()
+ from apache_beam.runners.portability.fn_api_runner import translations
+ if (options.view_as(StandardOptions).streaming or pre_optimize == 'none' or
+ pre_optimize == 'default'):
+ phases = []
+ elif pre_optimize == 'all':
+ phases = [
+ translations.eliminate_common_key_with_none,
+ # TODO(BEAM-11694): Enable translations.pack_combiners
+ # translations.pack_combiners,
+ translations.sort_stages
+ ]
+ else:
+ phases = []
+ for phase_name in pre_optimize.split(','):
+ # For now, these are all we allow.
+ if phase_name in ('eliminate_common_key_with_none', 'pack_combiners'):
Review comment:
Actually it's always enabled in `all`, and here we allow it to be
enabled by specifying a comma-delimited list of phases. i.e. it's one of the
allowed phases. See [previous
discussion](https://github.com/apache/beam/pull/13763#discussion_r565530996) in
the PR to be reverted.
##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -552,6 +511,38 @@ def run_pipeline(self, pipeline, options):
self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
return_context=True, default_environment=self._default_environment)
+ # Optimize the pipeline if it not streaming and the pre_optimize
+ # experiment is set.
+ pre_optimize = options.view_as(DebugOptions).lookup_experiment(
+ 'pre_optimize', 'default').lower()
+ from apache_beam.runners.portability.fn_api_runner import translations
+ if (options.view_as(StandardOptions).streaming or pre_optimize == 'none' or
+ pre_optimize == 'default'):
+ phases = []
+ elif pre_optimize == 'all':
+ phases = [
+ translations.eliminate_common_key_with_none,
+ # TODO(BEAM-11694): Enable translations.pack_combiners
+ # translations.pack_combiners,
+ translations.sort_stages
+ ]
+ else:
+ phases = []
+ for phase_name in pre_optimize.split(','):
+ # For now, these are all we allow.
+ if phase_name in ('eliminate_common_key_with_none', 'pack_combiners'):
+ phases.append(getattr(translations, phase_name))
+ else:
+ raise ValueError(
+ 'Unknown or inapplicable phase for pre_optimize: %s' %
phase_name)
+ phases.append(translations.sort_stages)
+
+ self.proto_pipeline = translations.optimize_pipeline(
Review comment:
Acknowledged. This PR removes `from_runner_api` from DataflowRunner so
this shouldn't be an issue.
##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -552,6 +511,38 @@ def run_pipeline(self, pipeline, options):
self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
return_context=True, default_environment=self._default_environment)
+ # Optimize the pipeline if it not streaming and the pre_optimize
+ # experiment is set.
+ pre_optimize = options.view_as(DebugOptions).lookup_experiment(
+ 'pre_optimize', 'default').lower()
+ from apache_beam.runners.portability.fn_api_runner import translations
+ if (options.view_as(StandardOptions).streaming or pre_optimize == 'none' or
+ pre_optimize == 'default'):
+ phases = []
+ elif pre_optimize == 'all':
+ phases = [
+ translations.eliminate_common_key_with_none,
Review comment:
Yes, this is tested by
[`translations_test.TranslationsTest.test_run_packable_combine_globally`](https://github.com/apache/beam/blob/5c31997a18dd1434f903838fc2acdbe728b40abe/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py).
Also I previously forgot to mark the test as ValidatesRunner so I've added
that to this PR.
Note that the test only checks that the pipeline is _correct_ with or
without the optimization, but does not check that the optimization was
_actually performed_, because that would require inspecting the optimized graph
which has to be done differently per runner, and because this optimization may
be enabled or disabled in different runners.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]