[
https://issues.apache.org/jira/browse/BEAM-9655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17082990#comment-17082990
]
Léopold Boudard commented on BEAM-9655:
---------------------------------------
Also I tried to package it in an external java transform:
[https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Deduplicate.java]
https://stackoverflow.com/questions/57384102/combining-java-and-python-in-apache-beam-pipeline
though cross language transforms dont seem functional yet for Dataflow Runner
on 2.19.0, I bumped into:
{code:java}
File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/runners/pipeline_context.py",
line 102, in get_by_id
self._id_to_proto[id], self._pipeline_context)
File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/pipeline.py",
line 1045, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/runners/pipeline_context.py",
line 102, in get_by_id
self._id_to_proto[id], self._pipeline_context)
File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/pipeline.py",
line 1045, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/runners/pipeline_context.py",
line 102, in get_by_id
self._id_to_proto[id], self._pipeline_context)
File
"/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/.venv/lib/python3.6/site-packages/apache_beam/pipeline.py",
line 1053, in from_runner_api
assert isinstance(result.transform, ParDo)
AssertionError
{code}
{code:java}
result.transform
<RunnerAPIPTransformHolder(PTransform) label=[RunnerAPIPTransformHolder] at
0x11ca39eb8>
{code}
for transform
{code:java}
KeyByElement/Map/ParMultiDo(Anonymous)
{code}
Thanks!
> Stateful Dataflow runner?
> -------------------------
>
> Key: BEAM-9655
> URL: https://issues.apache.org/jira/browse/BEAM-9655
> Project: Beam
> Issue Type: Wish
> Components: runner-dataflow
> Affects Versions: 2.19.0
> Reporter: Léopold Boudard
> Priority: Major
>
> Hi,
> I'm trying to use python portable DataflowRunner with a
> [BagStateSpec|[https://beam.apache.org/releases/pydoc/2.6.0/apache_beam.transforms.userstate.html]].
> Though I encounter followiung issue:
> {code:java}
> Traceback (most recent call last):
> File "/Users/leopold/.pyenv/versions/3.6.0/lib/python3.6/runpy.py", line
> 193, in _run_module_as_main
> "__main__", mod_spec)
> File "/Users/leopold/.pyenv/versions/3.6.0/lib/python3.6/runpy.py", line
> 85, in _run_code
> exec(code, run_globals)
> File
> "/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/test_runner.py",
> line 49, in <module>
> run()
> File
> "/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/test_runner.py",
> line 44, in run
> | 'write to file' >> WriteToText(known_args.output)
> File
> "/Users/leopold/.pyenv/versions/BenchmarkListingStreaming/lib/python3.6/site-packages/apache_beam/pipeline.py",
> line 481, in __exit__
> self.run().wait_until_finish()
> File
> "/Users/leopold/.pyenv/versions/BenchmarkListingStreaming/lib/python3.6/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
> line 1449, in wait_until_finish
> (self.state, getattr(self._runner, 'last_error_msg', None)), self)
> apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException:
> Dataflow pipeline failed. State: FAILED, Error:
> Traceback (most recent call last):
> File
> "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line
> 648, in do_work
> work_executor.execute()
> File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py",
> line 176, in execute
> op.start()
> File "apache_beam/runners/worker/operations.py", line 649, in
> apache_beam.runners.worker.operations.DoOperation.start
> File "apache_beam/runners/worker/operations.py", line 651, in
> apache_beam.runners.worker.operations.DoOperation.start
> File "apache_beam/runners/worker/operations.py", line 652, in
> apache_beam.runners.worker.operations.DoOperation.start
> File "apache_beam/runners/worker/operations.py", line 261, in
> apache_beam.runners.worker.operations.Operation.start
> File "apache_beam/runners/worker/operations.py", line 266, in
> apache_beam.runners.worker.operations.Operation.start
> File "apache_beam/runners/worker/operations.py", line 597, in
> apache_beam.runners.worker.operations.DoOperation.setup
> File "apache_beam/runners/worker/operations.py", line 636, in
> apache_beam.runners.worker.operations.DoOperation.setup
> File "apache_beam/runners/common.py", line 866, in
> apache_beam.runners.common.DoFnRunner.__init__
> Exception: Requested execution of a stateful DoFn, but no user state context
> is available. This likely means that the current runner does not support the
> execution of stateful DoFns.
> {code}
> I've also seen this issue in stackoverflow
> [https://stackoverflow.com/questions/55413690/does-google-dataflow-support-stateful-pipelines-developed-with-python-sdk]
>
> Do you have any idea/ETA when this feature will be available with beam?
>
> Thanks!
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)