This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 55e10313341 Various render runner improvements. (#28285)
55e10313341 is described below

commit 55e1031334197f9dbaa9d9e83023fcb56a61e6d6
Author: Robert Bradshaw <[email protected]>
AuthorDate: Tue Sep 5 10:02:00 2023 -0700

    Various render runner improvements. (#28285)
    
    * Error message rather than silence when no rendering requested.
    * Fix render_leaf_composite_nodes to act on names rather than ids.
    * Allow rendering of raw .dot files when graphviz is not installed.
---
 sdks/python/apache_beam/io/iobase.py           |  3 ++-
 sdks/python/apache_beam/runners/render.py      | 36 ++++++++++++++++++++++++--
 sdks/python/apache_beam/runners/render_test.py | 13 ++++++++++
 3 files changed, 49 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/io/iobase.py 
b/sdks/python/apache_beam/io/iobase.py
index e15205ead4d..96f154dbe4b 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -908,7 +908,8 @@ class Read(ptransform.PTransform):
       return (
           pbegin
           | Impulse()
-          | core.Map(lambda _: self.source).with_output_types(BoundedSource)
+          | 'EmitSource' >>
+          core.Map(lambda _: self.source).with_output_types(BoundedSource)
           | SDFBoundedSourceReader(display_data))
     elif isinstance(self.source, ptransform.PTransform):
       # The Read transform can also admit a full PTransform as an input
diff --git a/sdks/python/apache_beam/runners/render.py 
b/sdks/python/apache_beam/runners/render.py
index 55e15cb8562..306bf8c2090 100644
--- a/sdks/python/apache_beam/runners/render.py
+++ b/sdks/python/apache_beam/runners/render.py
@@ -129,6 +129,12 @@ class RenderOptions(pipeline_options.PipelineOptions):
         help='Set to also log input pipeline proto to stdout.')
     return parser
 
+  def __init__(self, *args, render_testing=False, **kwargs):
+    super().__init__(*args, **kwargs)
+    if self.render_port < 0 and not self.render_output and not render_testing:
+      raise ValueError(
+          'At least one of --render_port or --render_output must be provided.')
+
 
 class PipelineRenderer:
   def __init__(self, pipeline, options):
@@ -147,8 +153,10 @@ class PipelineRenderer:
 
     # Figure out at what point to stop rendering composite internals.
     if options.render_leaf_composite_nodes:
-      is_leaf = lambda name: any(
-          re.match(pattern, name)
+      is_leaf = lambda transform_id: any(
+          re.match(
+              pattern,
+              self.pipeline.components.transforms[transform_id].unique_name)
           for patterns in options.render_leaf_composite_nodes
           for pattern in patterns.split(','))
       self.leaf_composites = set()
@@ -403,6 +411,30 @@ class RenderRunner(runner.PipelineRunner):
     if render_options.log_proto:
       logging.info(pipeline_proto)
     renderer = PipelineRenderer(pipeline_proto, render_options)
+    try:
+      subprocess.run(['dotX', '-V'], capture_output=True, check=True)
+    except FileNotFoundError as exn:
+      # If dot is not available, we can at least output the raw .dot files.
+      dot_files = [
+          output for output in render_options.render_output
+          if output.endswith('.dot')
+      ]
+      for output in dot_files:
+        with open(output, 'w') as fout:
+          fout.write(renderer.to_dot())
+          logging.info("Wrote pipeline as %s", output)
+
+      non_dot_files = set(render_options.render_output) - set(dot_files)
+      if non_dot_files:
+        raise RuntimeError(
+            "Graphviz dot executable not available "
+            f"for rendering non-dot output files {non_dot_files}") from exn
+      elif render_options.render_port >= 0:
+        raise RuntimeError(
+            "Graphviz dot executable not available for serving") from exn
+
+      return RenderPipelineResult(None)
+
     renderer.page()
 
     if render_options.render_port >= 0:
diff --git a/sdks/python/apache_beam/runners/render_test.py 
b/sdks/python/apache_beam/runners/render_test.py
index 5872e003aec..4dca2b8b522 100644
--- a/sdks/python/apache_beam/runners/render_test.py
+++ b/sdks/python/apache_beam/runners/render_test.py
@@ -83,6 +83,19 @@ class RenderRunnerTest(unittest.TestCase):
     renderer.update(toggle=[create_transform_id])
     renderer.render_data()
 
+  def test_leaf_composite_filter(self):
+    try:
+      subprocess.run(['dot', '-V'], capture_output=True, check=True)
+    except FileNotFoundError:
+      self.skipTest('dot executable not installed')
+    p = beam.Pipeline()
+    _ = p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x * x)
+    dot = render.PipelineRenderer(
+        p.to_runner_api(),
+        render.RenderOptions(['--render_leaf_composite_nodes=Create'],
+                             render_testing=True)).to_dot()
+    self.assertEqual(dot.count('->'), 1)
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

Reply via email to