Ankur Goenka created BEAM-8945:
----------------------------------
Summary: DirectStreamObserver race condition
Key: BEAM-8945
URL: https://issues.apache.org/jira/browse/BEAM-8945
Project: Beam
Issue Type: Bug
Components: sdk-java-harness
Affects Versions: 2.16.0
Reporter: Ankur Goenka
Assignee: Ankur Goenka
The DirectStreamObserver can bet into a dead lock if the channel become
unhealthy of is not ready. An extended period of unhealthyness should result
into failure.
This is supported by following thread dumps where we see that 1 thread is
having on getting the lock on actual stream observer while the remaining worker
threads are waiting on the lock on the stream observer.
The thread which is having lock on stream observer is probably in the while
loop because the outboundObserver is not ready.
Their is also 1 thread which is waiting to execute onError which means that
the stream observer has become unhealthy and probably never going to get ready.
100s of threads are blocked on:
org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onNext(SynchronizedStreamObserver.java:46)
org.apache.beam.runners.fnexecution.control.FnApiControlClient.handle(FnApiControlClient.java:84)
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.getProcessBundleProgress(RegisterAndProcessBundleOperation.java:393)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:347)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:334)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$107/1297335196.run(Unknown
Source)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
One thread having the lock:
State: TIMED_WAITING stack: ---
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
java.util.concurrent.Phaser$QNode.block(Phaser.java:1142)
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
java.util.concurrent.Phaser.internalAwaitAdvance(Phaser.java:1067)
java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:796)
org.apache.beam.sdk.fn.stream.DirectStreamObserver.onNext(DirectStreamObserver.java:70)
org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onNext(SynchronizedStreamObserver.java:46)
org.apache.beam.runners.fnexecution.control.FnApiControlClient.handle(FnApiControlClient.java:84)
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.getProcessBundleProgress(RegisterAndProcessBundleOperation.java:393)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:347)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:334)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$107/1297335196.run(Unknown
Source)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
One thread waiting to execute onError
State: BLOCKED stack: ---
org.apache.beam.sdk.fn.stream.SynchronizedStreamObserver.onError(SynchronizedStreamObserver.java:53)
org.apache.beam.runners.fnexecution.control.FnApiControlClient.closeAndTerminateOutstandingRequests(FnApiControlClient.java:117)
org.apache.beam.runners.fnexecution.control.FnApiControlClient.access$300(FnApiControlClient.java:49)
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onError(FnApiControlClient.java:174)
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:270)
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:337)
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:793)
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
cc: [~lcwik]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)