This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f8fbbfa [BEAM-12661] Fix stuck GetData Windmill calls (#15224)
f8fbbfa is described below
commit f8fbbfa309ac88848057de694d4cc1cba3eaa92a
Author: slavachernyak <[email protected]>
AuthorDate: Mon Jul 26 16:49:29 2021 -0700
[BEAM-12661] Fix stuck GetData Windmill calls (#15224)
[BEAM-12661] Fix stuck GetData Windmill calls
---
.../beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
index 81ae092..3c06ee9 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
@@ -1525,8 +1525,8 @@ public class GrpcWindmillServer extends
WindmillServerStub {
try {
blockedStartMs.set(Instant.now().getMillis());
- current = queue.take();
- if (current != POISON_PILL) {
+ current = queue.poll(180, TimeUnit.SECONDS);
+ if (current != null && current != POISON_PILL) {
return true;
}
if (cancelled.get()) {
@@ -1535,7 +1535,8 @@ public class GrpcWindmillServer extends
WindmillServerStub {
if (complete.get()) {
return false;
}
- throw new IllegalStateException("Got poison pill but stream
is not done.");
+ throw new IllegalStateException(
+ "Got poison pill or timeout but stream is not done.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CancellationException();