[ 
https://issues.apache.org/jira/browse/BEAM-9655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17080378#comment-17080378
 ] 

Léopold Boudard commented on BEAM-9655:
---------------------------------------

To keep you updated, the error above happens on bounded source.

On unbounded source, I face the same issue as

[https://stackoverflow.com/questions/55413690/does-google-dataflow-support-stateful-pipelines-developed-with-python-sdk]

I've also tried with timers, and had the following issue:
{code:java}
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error 
received from SDK harness for instruction -670594: Traceback (most recent call 
last): File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 282, in get processor = 
self.cached_bundle_processors[bundle_descriptor_id].pop() IndexError: pop from 
empty list During handling of the above exception, another exception occurred: 
Traceback (most recent call last): File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 167, in _execute response = task() File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 223, in <lambda> lambda: self.create_worker().do_instruction(request), 
request) File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 352, in do_instruction request.instruction_id) File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 380, in process_bundle instruction_id, 
request.process_bundle_descriptor_id) File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 288, in get self.data_channel_factory) File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 722, in __init__ op.setup() File 
"apache_beam/runners/worker/operations.py", line 597, in 
apache_beam.runners.worker.operations.DoOperation.setup File 
"apache_beam/runners/worker/operations.py", line 624, in 
apache_beam.runners.worker.operations.DoOperation.setup File 
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 611, in update_timer_receivers self._timer_receivers[tag] = 
receivers.pop(tag) KeyError: 'event_timer' 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) 
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:332)
 
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
 
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125)
 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1350)
 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:152)
 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1073)
 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
java.lang.Thread.run(Thread.java:748) Caused by:
{code}
trying to use the exact same transform than:

[https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/deduplicate.py]

 

Which also indicates that userstatecontext has not been initialized properly.

>From what I can tell from the stack, and from what I can see from code examples

[https://beam.apache.org/blog/2017/02/13/stateful-processing.html]

I'm not sure if DataflowRunner is actually missing implementation, or if it 
just hasn't been initialized with proper arguments.

Though the documentation does mentions state and timers are not yet available 
in python streaming:

[https://beam.apache.org/documentation/sdks/python-streaming/]

 

It works and is implemented properly in FlinkRunner/fn_api/Portable runner on 
flink

[https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java]

cc [~mxm]

 

Thanks!

 

> Stateful Dataflow runner?
> -------------------------
>
>                 Key: BEAM-9655
>                 URL: https://issues.apache.org/jira/browse/BEAM-9655
>             Project: Beam
>          Issue Type: Wish
>          Components: runner-dataflow
>    Affects Versions: 2.19.0
>            Reporter: Léopold Boudard
>            Priority: Major
>
> Hi,
> I'm trying to use python portable DataflowRunner with a 
> [BagStateSpec|[https://beam.apache.org/releases/pydoc/2.6.0/apache_beam.transforms.userstate.html]].
>  Though I encounter followiung issue:
> {code:java}
> Traceback (most recent call last):
>   File "/Users/leopold/.pyenv/versions/3.6.0/lib/python3.6/runpy.py", line 
> 193, in _run_module_as_main
>     "__main__", mod_spec)
>   File "/Users/leopold/.pyenv/versions/3.6.0/lib/python3.6/runpy.py", line 
> 85, in _run_code
>     exec(code, run_globals)
>   File 
> "/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/test_runner.py",
>  line 49, in <module>
>     run()
>   File 
> "/Users/leopold/workspace/BenchmarkListingStreaming/listing_beam_pipeline/test_runner.py",
>  line 44, in run
>     | 'write to file' >> WriteToText(known_args.output)
>   File 
> "/Users/leopold/.pyenv/versions/BenchmarkListingStreaming/lib/python3.6/site-packages/apache_beam/pipeline.py",
>  line 481, in __exit__
>     self.run().wait_until_finish()
>   File 
> "/Users/leopold/.pyenv/versions/BenchmarkListingStreaming/lib/python3.6/site-packages/apache_beam/runners/dataflow/dataflow_runner.py",
>  line 1449, in wait_until_finish
>     (self.state, getattr(self._runner, 'last_error_msg', None)), self)
> apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: 
> Dataflow pipeline failed. State: FAILED, Error:
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.6/site-packages/dataflow_worker/batchworker.py", line 
> 648, in do_work
>     work_executor.execute()
>   File "/usr/local/lib/python3.6/site-packages/dataflow_worker/executor.py", 
> line 176, in execute
>     op.start()
>   File "apache_beam/runners/worker/operations.py", line 649, in 
> apache_beam.runners.worker.operations.DoOperation.start
>   File "apache_beam/runners/worker/operations.py", line 651, in 
> apache_beam.runners.worker.operations.DoOperation.start
>   File "apache_beam/runners/worker/operations.py", line 652, in 
> apache_beam.runners.worker.operations.DoOperation.start
>   File "apache_beam/runners/worker/operations.py", line 261, in 
> apache_beam.runners.worker.operations.Operation.start
>   File "apache_beam/runners/worker/operations.py", line 266, in 
> apache_beam.runners.worker.operations.Operation.start
>   File "apache_beam/runners/worker/operations.py", line 597, in 
> apache_beam.runners.worker.operations.DoOperation.setup
>   File "apache_beam/runners/worker/operations.py", line 636, in 
> apache_beam.runners.worker.operations.DoOperation.setup
>   File "apache_beam/runners/common.py", line 866, in 
> apache_beam.runners.common.DoFnRunner.__init__
> Exception: Requested execution of a stateful DoFn, but no user state context 
> is available. This likely means that the current runner does not support the 
> execution of stateful DoFns.
> {code}
> I've also seen this issue in stackoverflow
> [https://stackoverflow.com/questions/55413690/does-google-dataflow-support-stateful-pipelines-developed-with-python-sdk]
>  
> Do you have any idea/ETA when this feature will be available with beam?
>  
> Thanks!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to