Sam Whittle created BEAM-13042:
----------------------------------

             Summary: Prevent unexpected blocking in 
RegisterAndProcessBundleOperation hasFailed
                 Key: BEAM-13042
                 URL: https://issues.apache.org/jira/browse/BEAM-13042
             Project: Beam
          Issue Type: Improvement
          Components: runner-dataflow
            Reporter: Sam Whittle
            Assignee: Sam Whittle


We have observed stack traces in stuck jobs as follows:
--- Threads (1): [Thread[pool-8-thread-1,5,main]] State: WAITING stack: ---
  sun.misc.Unsafe.park(Native Method)
  java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
  
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
  java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
  java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
  java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
  
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.hasFailed(RegisterAndProcessBundleOperation.java:407)
  
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:332)
  
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:326)
  
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker$$Lambda$121/1203893465.run(Unknown
 Source)
  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
  
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
  
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
  
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  java.lang.Thread.run(Thread.java:748)


This code appears to not expect blocking as it checks isDone before calling 
get()

  public boolean hasFailed() throws ExecutionException, InterruptedException {
    if (processBundleResponse != null && 
processBundleResponse.toCompletableFuture().isDone()) {
      return 
!processBundleResponse.toCompletableFuture().get().getError().isEmpty();
    } else {
      // At the very least, we don't know that this has failed yet.
      return false;
    }
  }

I'm unsure why this is occurring but it could be that the two calls to 
toCompletableFuture are returning different futures for some reason, or that 
the completion stage is somehow changing from done to undone.  In either case 
this could by using a single call to  CompletableFuture.getNow method which 
guarantees not to block.

This affects the V1 Runner harness which isn't generally used but might as well 
be fixed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to