[
https://issues.apache.org/jira/browse/BEAM-12586?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17531859#comment-17531859
]
Kenneth Knowles commented on BEAM-12586:
----------------------------------------
At the very least, the DirectRunner should have detection added that says
"cannot flatten together bounded and unbounded PCollection". We may not have
someone to work on this soon so maybe that is helpful.
Question: can this be worked around by inserting dummy steps? I am wondering if
it is something in the details of the graph rewriting that won't always occur.
> Python Direct Runner doesn't support both streaming & non streaming sources
> ---------------------------------------------------------------------------
>
> Key: BEAM-12586
> URL: https://issues.apache.org/jira/browse/BEAM-12586
> Project: Beam
> Issue Type: Bug
> Components: runner-direct, sdk-py-core
> Affects Versions: 2.30.0
> Reporter: Christophe Rodriguez
> Priority: P2
>
> Please see Stack Overflow discussion:
> [https://stackoverflow.com/questions/68125864/transform-node-appliedptransform-was-not-replaced-as-expected-error-with-the-dir]
> When I create a GCS source & a Pub Source and try to flatten both, there is
> an error because of some incompatible transformation done by the direct
> runner.
> Code example:
> {code:java}
> gcsEventsColl = p | "Read from GCS" >>
> beam.io.ReadFromText("gs://sample_events_for_beam/*.log") \
> | 'convert to dict' >> beam.Map(lambda x: json.loads(x))
> liveEventsColl = p | "Read from Pubsub" >>
> beam.io.ReadFromPubSub(topic="projects/axxxx/topics/input_topic") \
> | 'convert to dict2' >> beam.Map(lambda x: json.loads(x))
> input_rec = (gcsEventsColl, liveEventsColl) | 'flatten' >> beam.Flatten()
> {code}
> Error:
> {code:java}
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 564, in run
> return self.runner.run_pipeline(self, self._options)
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
> line 131, in run_pipeline
> return runner.run_pipeline(pipeline, options)
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
> line 529, in run_pipeline
> pipeline.replace_all(_get_transform_overrides(options))
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 504, in replace_all
> self._check_replacement(override)
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 478, in _check_replacement
> self.visit(ReplacementValidator())
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 611, in visit
> self._root_transform().visit(visitor, self, visited)
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1195, in visit
> part.visit(visitor, pipeline, visited)
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1195, in visit
> part.visit(visitor, pipeline, visited)
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1195, in visit
> part.visit(visitor, pipeline, visited) [Previous line repeated 4 more
> times]
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1198, in visit
> visitor.visit_transform(self)
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 476, in visit_transform
> transform_node) RuntimeError: Transform node AppliedPTransform(Read from
> GCS/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey,
> _GroupByKeyOnly) was not replaced as expected.
> {code}
> The direct runner corrupts the pipeline when it rewrites the transforms.
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)