This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5ea504d [BEAM-11715] Partial revert of "Combiner packing in Dataflow"
(#13763) (#13884)
5ea504d is described below
commit 5ea504de2eb187dca733f6087aea780dc781040d
Author: Yifan Mai <[email protected]>
AuthorDate: Thu Feb 4 20:30:17 2021 -0800
[BEAM-11715] Partial revert of "Combiner packing in Dataflow" (#13763)
(#13884)
* Revert "[BEAM-11695] Combiner packing in Dataflow (#13763)"
This reverts commit 3b51aaac556bcdc89b661793b55c4aca9a803e51.
* Make pack_combiners optional
* Don't revert translations.py
* Add missing ValidatesRunner
---
.../runners/dataflow/dataflow_runner.py | 103 ++++++++++-----------
.../runners/dataflow/dataflow_runner_test.py | 1 +
.../runners/dataflow/ptransform_overrides.py | 27 ------
.../portability/fn_api_runner/translations_test.py | 1 +
4 files changed, 49 insertions(+), 83 deletions(-)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 8fa69fb..6590366 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -462,62 +462,6 @@ class DataflowRunner(PipelineRunner):
self._maybe_add_unified_worker_missing_options(options)
- from apache_beam.transforms import environments
- if options.view_as(SetupOptions).prebuild_sdk_container_engine:
- # if prebuild_sdk_container_engine is specified we will build a new sdk
- # container image with dependencies pre-installed and use that image,
- # instead of using the inferred default container image.
- self._default_environment = (
- environments.DockerEnvironment.from_options(options))
- options.view_as(WorkerOptions).worker_harness_container_image = (
- self._default_environment.container_image)
- else:
- self._default_environment = (
- environments.DockerEnvironment.from_container_image(
- apiclient.get_container_image_from_options(options),
- artifacts=environments.python_sdk_dependencies(options)))
-
- # Optimize the pipeline if it not streaming and optimizations are enabled
- # in options.
- pre_optimize = options.view_as(DebugOptions).lookup_experiment(
- 'pre_optimize', 'default').lower()
- if (not options.view_as(StandardOptions).streaming and
- pre_optimize != 'none' and pre_optimize != 'default'):
- from apache_beam.runners.portability.fn_api_runner import translations
- if pre_optimize == 'all':
- phases = [
- translations.eliminate_common_key_with_none,
- translations.pack_combiners,
- translations.sort_stages
- ]
- else:
- phases = []
- for phase_name in pre_optimize.split(','):
- # For now, these are all we allow.
- if phase_name in ('eliminate_common_key_with_none',
'pack_combiners'):
- phases.append(getattr(translations, phase_name))
- else:
- raise ValueError(
- 'Unknown or inapplicable phase for pre_optimize: %s' %
- phase_name)
- phases.append(translations.sort_stages)
-
- proto_pipeline_to_optimize = pipeline.to_runner_api(
- default_environment=self._default_environment)
- optimized_proto_pipeline = translations.optimize_pipeline(
- proto_pipeline_to_optimize,
- phases=phases,
- known_runner_urns=frozenset(),
- partial=True)
- pipeline = beam.Pipeline.from_runner_api(
- optimized_proto_pipeline, self, options)
- # The translations.pack_combiners optimizer phase produces a
CombinePerKey
- # PTransform, but DataflowRunner treats CombinePerKey as a composite, so
- # this override expands CombinePerKey into primitive PTransforms.
- if translations.pack_combiners in phases:
- from apache_beam.runners.dataflow.ptransform_overrides import
CombinePerKeyPTransformOverride
- pipeline.replace_all([CombinePerKeyPTransformOverride()])
-
use_fnapi = apiclient._use_fnapi(options)
if not use_fnapi:
@@ -544,6 +488,21 @@ class DataflowRunner(PipelineRunner):
if use_fnapi and not apiclient._use_unified_worker(options):
pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES)
+ from apache_beam.transforms import environments
+ if options.view_as(SetupOptions).prebuild_sdk_container_engine:
+ # if prebuild_sdk_container_engine is specified we will build a new sdk
+ # container image with dependencies pre-installed and use that image,
+ # instead of using the inferred default container image.
+ self._default_environment = (
+ environments.DockerEnvironment.from_options(options))
+ options.view_as(WorkerOptions).worker_harness_container_image = (
+ self._default_environment.container_image)
+ else:
+ self._default_environment = (
+ environments.DockerEnvironment.from_container_image(
+ apiclient.get_container_image_from_options(options),
+ artifacts=environments.python_sdk_dependencies(options)))
+
# This has to be performed before pipeline proto is constructed to make
sure
# that the changes are reflected in the portable job submission path.
self._adjust_pipeline_for_dataflow_v2(pipeline)
@@ -552,6 +511,38 @@ class DataflowRunner(PipelineRunner):
self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
return_context=True, default_environment=self._default_environment)
+ # Optimize the pipeline if it not streaming and the pre_optimize
+ # experiment is set.
+ pre_optimize = options.view_as(DebugOptions).lookup_experiment(
+ 'pre_optimize', 'default').lower()
+ from apache_beam.runners.portability.fn_api_runner import translations
+ if (options.view_as(StandardOptions).streaming or pre_optimize == 'none' or
+ pre_optimize == 'default'):
+ phases = []
+ elif pre_optimize == 'all':
+ phases = [
+ translations.eliminate_common_key_with_none,
+ # TODO(BEAM-11694): Enable translations.pack_combiners
+ # translations.pack_combiners,
+ translations.sort_stages
+ ]
+ else:
+ phases = []
+ for phase_name in pre_optimize.split(','):
+ # For now, these are all we allow.
+ if phase_name in ('eliminate_common_key_with_none', 'pack_combiners'):
+ phases.append(getattr(translations, phase_name))
+ else:
+ raise ValueError(
+ 'Unknown or inapplicable phase for pre_optimize: %s' %
phase_name)
+ phases.append(translations.sort_stages)
+
+ self.proto_pipeline = translations.optimize_pipeline(
+ self.proto_pipeline,
+ phases=phases,
+ known_runner_urns=frozenset(),
+ partial=True)
+
if use_fnapi:
self._check_for_unsupported_fnapi_features(self.proto_pipeline)
else:
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index d921980..9074d97 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -877,6 +877,7 @@ class DataflowRunnerTest(unittest.TestCase,
ExtraAssertionsMixin):
self._test_pack_combiners(
PipelineOptions(self.default_properties), expect_packed=False)
+ @unittest.skip("BEAM-11694")
def test_pack_combiners_enabled_by_experiment(self):
self.default_properties.append('--experiment=pre_optimize=all')
self._test_pack_combiners(
diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
index 14300de..402a4ed 100644
--- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
+++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
@@ -125,33 +125,6 @@ class JrhReadPTransformOverride(PTransformOverride):
'Read'))
-class CombinePerKeyPTransformOverride(PTransformOverride):
- """A ``PTransformOverride`` for ``CombinePerKey``.
-
- The translations.pack_combiners optimizer phase produces a CombinePerKey
- PTransform, but DataflowRunner treats CombinePerKey as a composite, so
- this override expands CombinePerKey into primitive PTransforms.
- """
- def matches(self, applied_ptransform):
- # Imported here to avoid circular dependencies.
- # pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam import CombinePerKey
-
- if isinstance(applied_ptransform.transform, CombinePerKey):
- self.transform = applied_ptransform.transform
- return True
- return False
-
- def get_replacement_transform(self, ptransform):
- from apache_beam.transforms import ptransform_fn
-
- @ptransform_fn
- def ExpandCombinePerKey(pcoll):
- return pcoll | ptransform
-
- return ExpandCombinePerKey()
-
-
class CombineValuesPTransformOverride(PTransformOverride):
"""A ``PTransformOverride`` for ``CombineValues``.
diff --git
a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
index d57307f..7ca5e3f 100644
---
a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
+++
b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
@@ -259,6 +259,7 @@ class TranslationsTest(unittest.TestCase):
| Create([('a', x) for x in vals])
| 'multiple-combines' >> MultipleCombines())
+ @attr('ValidatesRunner')
def test_run_packable_combine_globally(self):
class MultipleCombines(beam.PTransform):
def expand(self, pcoll):