This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fc53397  Fix translations.optimize_pipeline() failure on empty 
pipelines (#12804)
fc53397 is described below

commit fc53397fc7619fa57d61f09f4405e0a180b014a1
Author: Yifan Mai <[email protected]>
AuthorDate: Thu Oct 1 17:17:33 2020 -0700

    Fix translations.optimize_pipeline() failure on empty pipelines (#12804)
---
 sdks/python/apache_beam/pipeline.py                            |  7 +++++--
 .../runners/portability/fn_api_runner/translations_test.py     | 10 ++++++++++
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 3e39998..c45282e 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -888,8 +888,11 @@ class Pipeline(object):
         proto.components,
         allow_proto_holders=allow_proto_holders,
         requirements=proto.requirements)
-    root_transform_id, = proto.root_transform_ids
-    p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
+    if proto.root_transform_ids:
+      root_transform_id, = proto.root_transform_ids
+      p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
+    else:
+      p.transforms_stack = [AppliedPTransform(None, None, '', None)]
     # TODO(robertwb): These are only needed to continue construction. Omit?
     p.applied_labels = {
         t.unique_name
diff --git 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
index 42e9f32..10bf8c0 100644
--- 
a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
+++ 
b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
@@ -22,6 +22,7 @@ import logging
 import unittest
 
 import apache_beam as beam
+from apache_beam import runners
 from apache_beam.options import pipeline_options
 from apache_beam.portability import common_urns
 from apache_beam.runners.portability.fn_api_runner import translations
@@ -119,6 +120,15 @@ class TranslationsTest(unittest.TestCase):
     self.assertEqual(len(combine_per_key_stages), 1)
     self.assertIn('/Pack', combine_per_key_stages[0].name)
 
+  def test_optimize_empty_pipeline(self):
+    pipeline = beam.Pipeline()
+    pipeline_proto = pipeline.to_runner_api()
+    optimized_pipeline_proto = translations.optimize_pipeline(
+        pipeline_proto, [], known_runner_urns=frozenset(), partial=True)
+    runner = runners.DirectRunner()
+    beam.Pipeline.from_runner_api(
+        optimized_pipeline_proto, runner, pipeline_options.PipelineOptions())
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

Reply via email to