[
https://issues.apache.org/jira/browse/BEAM-8945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17548386#comment-17548386
]
Danny McCormick commented on BEAM-8945:
---------------------------------------
This issue has been migrated to https://github.com/apache/beam/issues/20028
> DirectStreamObserver race condition
> -----------------------------------
>
> Key: BEAM-8945
> URL: https://issues.apache.org/jira/browse/BEAM-8945
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-harness
> Affects Versions: 2.16.0
> Reporter: Ankur Goenka
> Priority: P3
> Labels: Clarified
>
> The DirectStreamObserver can get into a dead lock if the channel become
> unhealthy of is not ready. An extended period of unhealthyness should result
> into failure.
> This is supported by following thread dumps where we see that 1 thread is
> having on getting the lock on actual stream observer while the remaining
> worker threads are waiting on the lock on the stream observer.
> The thread which is having lock on stream observer is probably in the while
> loop because the outboundObserver is not ready.
> Their is also 1 thread which is waiting to execute onError which means that
> the stream observer has become unhealthy and probably never going to get
> ready.
> 100s of threads are blocked on:
>
>
> org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onNext(SynchronizedStreamObserver.java:46)
>
> org.apache.beam.runners.fnexecution.control.FnApiControlClient.handle(FnApiControlClient.java:84)
>
> org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.getProcessBundleProgress(RegisterAndProcessBundleOperation.java:393)
>
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:347)
>
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:334)
>
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$107/1297335196.run(Unknown
> Source)
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
>
>
> One thread having the lock:
> State: TIMED_WAITING stack: —
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> java.util.concurrent.Phaser$QNode.block(Phaser.java:1142)
> java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
> java.util.concurrent.Phaser.internalAwaitAdvance(Phaser.java:1067)
> java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:796)
>
> org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:70)
>
> org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onNext(SynchronizedStreamObserver.java:46)
>
> org.apache.beam.runners.fnexecution.control.FnApiControlClient.handle(FnApiControlClient.java:84)
>
> org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.getProcessBundleProgress(RegisterAndProcessBundleOperation.java:393)
>
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:347)
>
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:334)
>
> org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$107/1297335196.run(Unknown
> Source)
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
>
>
> One thread waiting to execute onError
> State: BLOCKED stack: —
>
> org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onError(SynchronizedStreamObserver.java:53)
>
> org.apache.beam.runners.fnexecution.control.FnApiControlClient.closeAndTerminateOutstandingRequests(FnApiControlClient.java:117)
>
> org.apache.beam.runners.fnexecution.control.FnApiControlClient.access$300(FnApiControlClient.java:49)
>
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onError(FnApiControlClient.java:174)
>
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:270)
>
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
>
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
>
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
>
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
>
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:337)
>
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:793)
>
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
>
> org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
>
>
> cc: [~lcwik]
--
This message was sent by Atlassian Jira
(v8.20.7#820007)