[
https://issues.apache.org/jira/browse/BEAM-12586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Beam JIRA Bot updated BEAM-12586:
---------------------------------
Priority: P3 (was: P2)
> Python Direct Runner doesn't support both streaming & non streaming sources
> ---------------------------------------------------------------------------
>
> Key: BEAM-12586
> URL: https://issues.apache.org/jira/browse/BEAM-12586
> Project: Beam
> Issue Type: Bug
> Components: runner-direct, sdk-py-core
> Affects Versions: 2.30.0
> Reporter: Christophe Rodriguez
> Priority: P3
> Labels: stale-P2
>
> Please see Stack Overflow discussion:
> [https://stackoverflow.com/questions/68125864/transform-node-appliedptransform-was-not-replaced-as-expected-error-with-the-dir]
> When I create a GCS source & a Pub Source and try to flatten both, there is
> an error because of some incompatible transformation done by the direct
> runner.
> Code example:
> {code:java}
> gcsEventsColl = p | "Read from GCS" >>
> beam.io.ReadFromText("gs://sample_events_for_beam/*.log") \
> | 'convert to dict' >> beam.Map(lambda x: json.loads(x))
> liveEventsColl = p | "Read from Pubsub" >>
> beam.io.ReadFromPubSub(topic="projects/axxxx/topics/input_topic") \
> | 'convert to dict2' >> beam.Map(lambda x: json.loads(x))
> input_rec = (gcsEventsColl, liveEventsColl) | 'flatten' >> beam.Flatten()
> {code}
> Error:
> {code:java}
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 564, in run
> return self.runner.run_pipeline(self, self._options)
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
> line 131, in run_pipeline
> return runner.run_pipeline(pipeline, options)
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
> line 529, in run_pipeline
> pipeline.replace_all(_get_transform_overrides(options))
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 504, in replace_all
> self._check_replacement(override)
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 478, in _check_replacement
> self.visit(ReplacementValidator())
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 611, in visit
> self._root_transform().visit(visitor, self, visited)
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1195, in visit
> part.visit(visitor, pipeline, visited)
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1195, in visit
> part.visit(visitor, pipeline, visited)
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1195, in visit
> part.visit(visitor, pipeline, visited) [Previous line repeated 4 more
> times]
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 1198, in visit
> visitor.visit_transform(self)
> File
> "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 476, in visit_transform
> transform_node) RuntimeError: Transform node AppliedPTransform(Read from
> GCS/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey,
> _GroupByKeyOnly) was not replaced as expected.
> {code}
> The direct runner corrupts the pipeline when it rewrites the transforms.
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)