This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 55e10313341 Various render runner improvements. (#28285)
55e10313341 is described below
commit 55e1031334197f9dbaa9d9e83023fcb56a61e6d6
Author: Robert Bradshaw <[email protected]>
AuthorDate: Tue Sep 5 10:02:00 2023 -0700
Various render runner improvements. (#28285)
* Error message rather than silence when no rendering requested.
* Fix render_leaf_composite_nodes to act on names rather than ids.
* Allow rendering of raw .dot files when graphviz is not installed.
---
sdks/python/apache_beam/io/iobase.py | 3 ++-
sdks/python/apache_beam/runners/render.py | 36 ++++++++++++++++++++++++--
sdks/python/apache_beam/runners/render_test.py | 13 ++++++++++
3 files changed, 49 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/io/iobase.py
b/sdks/python/apache_beam/io/iobase.py
index e15205ead4d..96f154dbe4b 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -908,7 +908,8 @@ class Read(ptransform.PTransform):
return (
pbegin
| Impulse()
- | core.Map(lambda _: self.source).with_output_types(BoundedSource)
+ | 'EmitSource' >>
+ core.Map(lambda _: self.source).with_output_types(BoundedSource)
| SDFBoundedSourceReader(display_data))
elif isinstance(self.source, ptransform.PTransform):
# The Read transform can also admit a full PTransform as an input
diff --git a/sdks/python/apache_beam/runners/render.py
b/sdks/python/apache_beam/runners/render.py
index 55e15cb8562..306bf8c2090 100644
--- a/sdks/python/apache_beam/runners/render.py
+++ b/sdks/python/apache_beam/runners/render.py
@@ -129,6 +129,12 @@ class RenderOptions(pipeline_options.PipelineOptions):
help='Set to also log input pipeline proto to stdout.')
return parser
+ def __init__(self, *args, render_testing=False, **kwargs):
+ super().__init__(*args, **kwargs)
+ if self.render_port < 0 and not self.render_output and not render_testing:
+ raise ValueError(
+ 'At least one of --render_port or --render_output must be provided.')
+
class PipelineRenderer:
def __init__(self, pipeline, options):
@@ -147,8 +153,10 @@ class PipelineRenderer:
# Figure out at what point to stop rendering composite internals.
if options.render_leaf_composite_nodes:
- is_leaf = lambda name: any(
- re.match(pattern, name)
+ is_leaf = lambda transform_id: any(
+ re.match(
+ pattern,
+ self.pipeline.components.transforms[transform_id].unique_name)
for patterns in options.render_leaf_composite_nodes
for pattern in patterns.split(','))
self.leaf_composites = set()
@@ -403,6 +411,30 @@ class RenderRunner(runner.PipelineRunner):
if render_options.log_proto:
logging.info(pipeline_proto)
renderer = PipelineRenderer(pipeline_proto, render_options)
+ try:
+ subprocess.run(['dotX', '-V'], capture_output=True, check=True)
+ except FileNotFoundError as exn:
+ # If dot is not available, we can at least output the raw .dot files.
+ dot_files = [
+ output for output in render_options.render_output
+ if output.endswith('.dot')
+ ]
+ for output in dot_files:
+ with open(output, 'w') as fout:
+ fout.write(renderer.to_dot())
+ logging.info("Wrote pipeline as %s", output)
+
+ non_dot_files = set(render_options.render_output) - set(dot_files)
+ if non_dot_files:
+ raise RuntimeError(
+ "Graphviz dot executable not available "
+ f"for rendering non-dot output files {non_dot_files}") from exn
+ elif render_options.render_port >= 0:
+ raise RuntimeError(
+ "Graphviz dot executable not available for serving") from exn
+
+ return RenderPipelineResult(None)
+
renderer.page()
if render_options.render_port >= 0:
diff --git a/sdks/python/apache_beam/runners/render_test.py
b/sdks/python/apache_beam/runners/render_test.py
index 5872e003aec..4dca2b8b522 100644
--- a/sdks/python/apache_beam/runners/render_test.py
+++ b/sdks/python/apache_beam/runners/render_test.py
@@ -83,6 +83,19 @@ class RenderRunnerTest(unittest.TestCase):
renderer.update(toggle=[create_transform_id])
renderer.render_data()
+ def test_leaf_composite_filter(self):
+ try:
+ subprocess.run(['dot', '-V'], capture_output=True, check=True)
+ except FileNotFoundError:
+ self.skipTest('dot executable not installed')
+ p = beam.Pipeline()
+ _ = p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x * x)
+ dot = render.PipelineRenderer(
+ p.to_runner_api(),
+ render.RenderOptions(['--render_leaf_composite_nodes=Create'],
+ render_testing=True)).to_dot()
+ self.assertEqual(dot.count('->'), 1)
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)