This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 770c6fe39cc Make windmill service stream max backoff configurable 
(#30475)
770c6fe39cc is described below

commit 770c6fe39cc90b19a0087d42dc9bb39c8e614cc6
Author: Arun Pandian <[email protected]>
AuthorDate: Mon Mar 4 02:20:29 2024 -0800

    Make windmill service stream max backoff configurable (#30475)
---
 .../runners/dataflow/options/DataflowStreamingPipelineOptions.java  | 6 ++++++
 .../beam/runners/dataflow/worker/StreamingDataflowWorker.java       | 2 +-
 .../dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java    | 3 +--
 3 files changed, 8 insertions(+), 3 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
index d02155d8ce3..e8396c02726 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
@@ -205,6 +205,12 @@ public interface DataflowStreamingPipelineOptions extends 
PipelineOptions {
 
   void setWindmillServiceRpcChannelAliveTimeoutSec(int value);
 
+  @Description("Max backoff with which the windmill service stream failures 
are retried")
+  @Default.Integer(30 * 1000) // 30s
+  int getWindmillServiceStreamMaxBackoffMillis();
+
+  void setWindmillServiceStreamMaxBackoffMillis(int value);
+
   /**
    * Factory for creating local Windmill address. Reads from system propery 
'windmill.hostport' for
    * backwards compatibility.
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 6df500eed3e..7bc186af445 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -614,7 +614,7 @@ public class StreamingDataflowWorker {
         Duration maxBackoff =
             !options.isEnableStreamingEngine() && 
options.getLocalWindmillHostport() != null
                 ? GrpcWindmillServer.LOCALHOST_MAX_BACKOFF
-                : GrpcWindmillServer.MAX_BACKOFF;
+                : 
Duration.millis(options.getWindmillServiceStreamMaxBackoffMillis());
         GrpcWindmillStreamFactory windmillStreamFactory =
             GrpcWindmillStreamFactory.of(
                     JobHeader.newBuilder()
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
index d6944b36034..b09e341f29e 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
@@ -88,7 +88,6 @@ import org.slf4j.LoggerFactory;
 @SuppressWarnings("nullness") // 
TODO(https://github.com/apache/beam/issues/20497
 public final class GrpcWindmillServer extends WindmillServerStub {
   public static final Duration LOCALHOST_MAX_BACKOFF = Duration.millis(500);
-  public static final Duration MAX_BACKOFF = Duration.standardSeconds(30);
   private static final Duration MIN_BACKOFF = Duration.millis(1);
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcWindmillServer.class);
   private static final int DEFAULT_LOG_EVERY_N_FAILURES = 20;
@@ -113,7 +112,7 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
       Consumer<List<Windmill.ComputationHeartbeatResponse>> 
processHeartbeatResponses) {
     this.options = options;
     this.throttleTimers = StreamingEngineThrottleTimers.create();
-    this.maxBackoff = MAX_BACKOFF;
+    this.maxBackoff = 
Duration.millis(options.getWindmillServiceStreamMaxBackoffMillis());
     this.dispatcherClient = grpcDispatcherClient;
     this.syncApplianceStub = null;
     this.sendKeyedGetDataRequests =

Reply via email to