This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f8fbbfa  [BEAM-12661] Fix stuck GetData Windmill calls (#15224)
f8fbbfa is described below

commit f8fbbfa309ac88848057de694d4cc1cba3eaa92a
Author: slavachernyak <[email protected]>
AuthorDate: Mon Jul 26 16:49:29 2021 -0700

    [BEAM-12661] Fix stuck GetData Windmill calls (#15224)
    
    [BEAM-12661] Fix stuck GetData Windmill calls
---
 .../beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java  | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
index 81ae092..3c06ee9 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
@@ -1525,8 +1525,8 @@ public class GrpcWindmillServer extends 
WindmillServerStub {
 
                 try {
                   blockedStartMs.set(Instant.now().getMillis());
-                  current = queue.take();
-                  if (current != POISON_PILL) {
+                  current = queue.poll(180, TimeUnit.SECONDS);
+                  if (current != null && current != POISON_PILL) {
                     return true;
                   }
                   if (cancelled.get()) {
@@ -1535,7 +1535,8 @@ public class GrpcWindmillServer extends 
WindmillServerStub {
                   if (complete.get()) {
                     return false;
                   }
-                  throw new IllegalStateException("Got poison pill but stream 
is not done.");
+                  throw new IllegalStateException(
+                      "Got poison pill or timeout but stream is not done.");
                 } catch (InterruptedException e) {
                   Thread.currentThread().interrupt();
                   throw new CancellationException();

Reply via email to