[ https://issues.apache.org/jira/browse/BEAM-8226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16929348#comment-16929348 ]
Ahmet Altay commented on BEAM-8226: ----------------------------------- Would this be addressed by Luke's recent change to increase the number of SDK harness threads? JRH seems to be throttling? Not sure why here: https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java#L171 SDK harness seems to be just waiting and probably not processing what would normally result in more elements to be processed that will cause element to be signalled (https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/RemoteGrpcPortWriteOperation.java#L156) Can we just remove this throttling? Or increased the allowed rate? Would the resilience changes (restart when stuck) address this issue? (I am guessing not since everything is alive here.) > Python Streaming Pipeline getting stuck in dataflow > --------------------------------------------------- > > Key: BEAM-8226 > URL: https://issues.apache.org/jira/browse/BEAM-8226 > Project: Beam > Issue Type: Bug > Components: sdk-java-harness, sdk-py-harness > Reporter: Ankur Goenka > Priority: Blocker > Fix For: 2.16.0 > > > Python streaming pipeline are getting stuck with following error when runing > on dataflow > > Relevant thread stack > --- Threads (4): [Thread[Thread-19,1,main], Thread[Thread-20,1,main], > Thread[Thread-21,1,main], Thread[Thread-22,1,main]] State: WAITING stack: --- > sun.misc.Unsafe.park(Native Method) > java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) > > org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.maybeWait(RemoteGrpcPortWriteOperation.java:175) > > org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.process(RemoteGrpcPortWriteOperation.java:196) > > org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201) > > org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159) > > org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77) > > org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1316) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:149) > > org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1049) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > java.lang.Thread.run(Thread.java:745) > For Python > --- Thread #139819623634688 name: ThreadPoolExecutor-1_0 --- > File "/usr/local/lib/python3.6/threading.py", line 884, in _bootstrap > self._bootstrap_inner() > File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner > self.run() > File "/usr/local/lib/python3.6/threading.py", line 864, in run > self._target(*self._args, **self._kwargs) > File "/usr/local/lib/python3.6/concurrent/futures/thread.py", line 69, in > _worker > work_item.run() > File "/usr/local/lib/python3.6/concurrent/futures/thread.py", line 56, in > run > result = self.fn(*self.args, **self.kwargs) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 191, in task > self._execute(lambda: worker.do_instruction(work), work) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 158, in _execute > response = task() > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 191, in <lambda> > self._execute(lambda: worker.do_instruction(work), work) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 343, in do_instruction > request.instruction_id) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", > line 369, in process_bundle > bundle_processor.process_bundle(instruction_id)) > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", > line 661, in process_bundle > instruction_id, expected_transforms): > File > "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/data_plane.py", > line 213, in input_elements > data = received.get(timeout=1) > File "/usr/local/lib/python3.6/queue.py", line 173, in get > self.not_empty.wait(remaining) > File "/usr/local/lib/python3.6/threading.py", line 299, in wait > gotit = waiter.acquire(True, timeout) -- This message was sent by Atlassian Jira (v8.3.2#803003)