Sam Whittle created BEAM-13042:
----------------------------------
Summary: Prevent unexpected blocking in
RegisterAndProcessBundleOperation hasFailed
Key: BEAM-13042
URL: https://issues.apache.org/jira/browse/BEAM-13042
Project: Beam
Issue Type: Improvement
Components: runner-dataflow
Reporter: Sam Whittle
Assignee: Sam Whittle
We have observed stack traces in stuck jobs as follows:
--- Threads (1): [Thread[pool-8-thread-1,5,main]] State: WAITING stack: ---
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.hasFailed(RegisterAndProcessBundleOperation.java:407)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:332)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:326)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$121/1203893465.run(Unknown
Source)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
This code appears to not expect blocking as it checks isDone before calling
get()
public boolean hasFailed() throws ExecutionException, InterruptedException {
if (processBundleResponse != null &&
processBundleResponse.toCompletableFuture().isDone()) {
return
!processBundleResponse.toCompletableFuture().get().getError().isEmpty();
} else {
// At the very least, we don't know that this has failed yet.
return false;
}
}
I'm unsure why this is occurring but it could be that the two calls to
toCompletableFuture are returning different futures for some reason, or that
the completion stage is somehow changing from done to undone. In either case
this could by using a single call to CompletableFuture.getNow method which
guarantees not to block.
This affects the V1 Runner harness which isn't generally used but might as well
be fixed.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)