This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 770c6fe39cc Make windmill service stream max backoff configurable
(#30475)
770c6fe39cc is described below
commit 770c6fe39cc90b19a0087d42dc9bb39c8e614cc6
Author: Arun Pandian <[email protected]>
AuthorDate: Mon Mar 4 02:20:29 2024 -0800
Make windmill service stream max backoff configurable (#30475)
---
.../runners/dataflow/options/DataflowStreamingPipelineOptions.java | 6 ++++++
.../beam/runners/dataflow/worker/StreamingDataflowWorker.java | 2 +-
.../dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java | 3 +--
3 files changed, 8 insertions(+), 3 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
index d02155d8ce3..e8396c02726 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
@@ -205,6 +205,12 @@ public interface DataflowStreamingPipelineOptions extends
PipelineOptions {
void setWindmillServiceRpcChannelAliveTimeoutSec(int value);
+ @Description("Max backoff with which the windmill service stream failures
are retried")
+ @Default.Integer(30 * 1000) // 30s
+ int getWindmillServiceStreamMaxBackoffMillis();
+
+ void setWindmillServiceStreamMaxBackoffMillis(int value);
+
/**
* Factory for creating local Windmill address. Reads from system propery
'windmill.hostport' for
* backwards compatibility.
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 6df500eed3e..7bc186af445 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -614,7 +614,7 @@ public class StreamingDataflowWorker {
Duration maxBackoff =
!options.isEnableStreamingEngine() &&
options.getLocalWindmillHostport() != null
? GrpcWindmillServer.LOCALHOST_MAX_BACKOFF
- : GrpcWindmillServer.MAX_BACKOFF;
+ :
Duration.millis(options.getWindmillServiceStreamMaxBackoffMillis());
GrpcWindmillStreamFactory windmillStreamFactory =
GrpcWindmillStreamFactory.of(
JobHeader.newBuilder()
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
index d6944b36034..b09e341f29e 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
@@ -88,7 +88,6 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings("nullness") //
TODO(https://github.com/apache/beam/issues/20497
public final class GrpcWindmillServer extends WindmillServerStub {
public static final Duration LOCALHOST_MAX_BACKOFF = Duration.millis(500);
- public static final Duration MAX_BACKOFF = Duration.standardSeconds(30);
private static final Duration MIN_BACKOFF = Duration.millis(1);
private static final Logger LOG =
LoggerFactory.getLogger(GrpcWindmillServer.class);
private static final int DEFAULT_LOG_EVERY_N_FAILURES = 20;
@@ -113,7 +112,7 @@ public final class GrpcWindmillServer extends
WindmillServerStub {
Consumer<List<Windmill.ComputationHeartbeatResponse>>
processHeartbeatResponses) {
this.options = options;
this.throttleTimers = StreamingEngineThrottleTimers.create();
- this.maxBackoff = MAX_BACKOFF;
+ this.maxBackoff =
Duration.millis(options.getWindmillServiceStreamMaxBackoffMillis());
this.dispatcherClient = grpcDispatcherClient;
this.syncApplianceStub = null;
this.sendKeyedGetDataRequests =