[ https://issues.apache.org/jira/browse/BEAM-8226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismaël Mejía updated BEAM-8226: ------------------------------- Status: Open (was: Triage Needed) > Python Streaming Pipeline getting stuck in dataflow > --------------------------------------------------- > > Key: BEAM-8226 > URL: https://issues.apache.org/jira/browse/BEAM-8226 > Project: Beam > Issue Type: Bug > Components: sdk-java-harness, sdk-py-harness > Reporter: Ankur Goenka > Assignee: Yichi Zhang > Priority: Blocker > Fix For: 2.16.0 > > Time Spent: 1h 50m > Remaining Estimate: 0h > > Python streaming pipeline are getting stuck with following error when runing > on dataflow > > Relevant thread stack > --- Threads (4): [Thread[Thread-19,1,main], Thread[Thread-20,1,main], > Thread[Thread-21,1,main], Thread[Thread-22,1,main]] State: WAITING stack: --- > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > > org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.maybeWait(RemoteGrpcPortWriteOperation.java:175) > > org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.process(RemoteGrpcPortWriteOperation.java:196) > > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) > > org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) > > org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > For Python > --- Thread #139819623634688 name: ThreadPoolExecutor-1_0 --- > File "/usr/local/lib/python3.6/threading.py", line 884, in _bootstrap > self._bootstrap_inner() > File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner > self.run() > File "/usr/local/lib/python3.6/threading.py", line 864, in run > self._target(*self._args, **self._kwargs) > File "/usr/local/lib/python3.6/concurrent/futures/thread.py", line 69, in > _worker > work_item.run() > File "/usr/local/lib/python3.6/concurrent/futures/thread.py", line 56, in > run > result = self.fn(*self.args, **self.kwargs) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 191, in task > self._execute(lambda: worker.do_instruction(work), work) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 158, in _execute > response = task() > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 191, in <lambda> > self._execute(lambda: worker.do_instruction(work), work) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 343, in do_instruction > request.instruction_id) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 369, in process_bundle > bundle_processor.process_bundle(instruction_id)) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 661, in process_bundle > instruction_id, expected_transforms): > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py", > line 213, in input_elements > data = received.get(timeout=1) > File "/usr/local/lib/python3.6/queue.py", line 173, in get > self.not_empty.wait(remaining) > File "/usr/local/lib/python3.6/threading.py", line 299, in wait > gotit = waiter.acquire(True, timeout) -- This message was sent by Atlassian Jira (v8.3.2#803003)