This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fc53397 Fix translations.optimize_pipeline() failure on empty
pipelines (#12804)
fc53397 is described below
commit fc53397fc7619fa57d61f09f4405e0a180b014a1
Author: Yifan Mai <[email protected]>
AuthorDate: Thu Oct 1 17:17:33 2020 -0700
Fix translations.optimize_pipeline() failure on empty pipelines (#12804)
---
sdks/python/apache_beam/pipeline.py | 7 +++++--
.../runners/portability/fn_api_runner/translations_test.py | 10 ++++++++++
2 files changed, 15 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/pipeline.py
b/sdks/python/apache_beam/pipeline.py
index 3e39998..c45282e 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -888,8 +888,11 @@ class Pipeline(object):
proto.components,
allow_proto_holders=allow_proto_holders,
requirements=proto.requirements)
- root_transform_id, = proto.root_transform_ids
- p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
+ if proto.root_transform_ids:
+ root_transform_id, = proto.root_transform_ids
+ p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
+ else:
+ p.transforms_stack = [AppliedPTransform(None, None, '', None)]
# TODO(robertwb): These are only needed to continue construction. Omit?
p.applied_labels = {
t.unique_name
diff --git
a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
index 42e9f32..10bf8c0 100644
---
a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
+++
b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
@@ -22,6 +22,7 @@ import logging
import unittest
import apache_beam as beam
+from apache_beam import runners
from apache_beam.options import pipeline_options
from apache_beam.portability import common_urns
from apache_beam.runners.portability.fn_api_runner import translations
@@ -119,6 +120,15 @@ class TranslationsTest(unittest.TestCase):
self.assertEqual(len(combine_per_key_stages), 1)
self.assertIn('/Pack', combine_per_key_stages[0].name)
+ def test_optimize_empty_pipeline(self):
+ pipeline = beam.Pipeline()
+ pipeline_proto = pipeline.to_runner_api()
+ optimized_pipeline_proto = translations.optimize_pipeline(
+ pipeline_proto, [], known_runner_urns=frozenset(), partial=True)
+ runner = runners.DirectRunner()
+ beam.Pipeline.from_runner_api(
+ optimized_pipeline_proto, runner, pipeline_options.PipelineOptions())
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)