This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c7fb9a0d5da [Dataflow Streaming] Use separate heartbeat streams based 
on job settings (#32511)
c7fb9a0d5da is described below

commit c7fb9a0d5da79018836c266355afbb33d92bb983
Author: Arun Pandian <[email protected]>
AuthorDate: Thu Sep 26 02:39:33 2024 -0700

    [Dataflow Streaming] Use separate heartbeat streams based on job settings 
(#32511)
---
 .../options/DataflowStreamingPipelineOptions.java  |   5 +-
 .../dataflow/worker/StreamingDataflowWorker.java   |  32 ++++-
 .../streaming/config/FakeGlobalConfigHandle.java   |  52 ++++++++
 .../work/refresh/StreamPoolHeartbeatSender.java    |  45 ++++++-
 .../refresh/StreamPoolHeartbeatSenderTest.java     | 132 +++++++++++++++++++++
 5 files changed, 253 insertions(+), 13 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
index a761d38de1a..10df6e24f49 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
@@ -132,10 +132,9 @@ public interface DataflowStreamingPipelineOptions extends 
PipelineOptions {
 
   @Description(
       "If true, separate streaming rpcs will be used for heartbeats instead of 
sharing streams with state reads.")
-  @Default.Boolean(false)
-  boolean getUseSeparateWindmillHeartbeatStreams();
+  Boolean getUseSeparateWindmillHeartbeatStreams();
 
-  void setUseSeparateWindmillHeartbeatStreams(boolean value);
+  void setUseSeparateWindmillHeartbeatStreams(Boolean value);
 
   @Description("The number of streams to use for GetData requests.")
   @Default.Integer(1)
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 0dedd4f34fd..8b440c306f0 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -146,6 +146,8 @@ public final class StreamingDataflowWorker {
   private static final int DEFAULT_STATUS_PORT = 8081;
   private static final Random CLIENT_ID_GENERATOR = new Random();
   private static final String CHANNELZ_PATH = "/channelz";
+  public static final String 
STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL =
+      "streaming_engine_use_job_settings_for_heartbeat_pool";
 
   private final WindmillStateCache stateCache;
   private final StreamingWorkerStatusPages statusPages;
@@ -253,12 +255,24 @@ public final class StreamingDataflowWorker {
               GET_DATA_STREAM_TIMEOUT,
               windmillServer::getDataStream);
       getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, 
getDataStreamPool);
-      heartbeatSender =
-          new StreamPoolHeartbeatSender(
-              options.getUseSeparateWindmillHeartbeatStreams()
-                  ? WindmillStreamPool.create(
-                      1, GET_DATA_STREAM_TIMEOUT, 
windmillServer::getDataStream)
-                  : getDataStreamPool);
+      // Experiment gates the logic till backend changes are rollback safe
+      if (!DataflowRunner.hasExperiment(
+              options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL)
+          || options.getUseSeparateWindmillHeartbeatStreams() != null) {
+        heartbeatSender =
+            StreamPoolHeartbeatSender.Create(
+                
Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams())
+                    ? separateHeartbeatPool(windmillServer)
+                    : getDataStreamPool);
+
+      } else {
+        heartbeatSender =
+            StreamPoolHeartbeatSender.Create(
+                separateHeartbeatPool(windmillServer),
+                getDataStreamPool,
+                configFetcher.getGlobalConfigHandle());
+      }
+
       stuckCommitDurationMillis =
           options.getStuckCommitDurationMillis() > 0 ? 
options.getStuckCommitDurationMillis() : 0;
       statusPagesBuilder
@@ -326,6 +340,11 @@ public final class StreamingDataflowWorker {
     LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
   }
 
+  private static WindmillStreamPool<GetDataStream> separateHeartbeatPool(
+      WindmillServerStub windmillServer) {
+    return WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT, 
windmillServer::getDataStream);
+  }
+
   public static StreamingDataflowWorker 
fromOptions(DataflowWorkerHarnessOptions options) {
     long clientId = CLIENT_ID_GENERATOR.nextLong();
     MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options);
@@ -834,6 +853,7 @@ public final class StreamingDataflowWorker {
    */
   @AutoValue
   abstract static class BackgroundMemoryMonitor {
+
     private static BackgroundMemoryMonitor create(MemoryMonitor memoryMonitor) 
{
       return new AutoValue_StreamingDataflowWorker_BackgroundMemoryMonitor(
           memoryMonitor,
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FakeGlobalConfigHandle.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FakeGlobalConfigHandle.java
new file mode 100644
index 00000000000..d4d73f5882b
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FakeGlobalConfigHandle.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import java.util.function.Consumer;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.sdk.annotations.Internal;
+
+@Internal
+@ThreadSafe
+/*
+ *  Fake StreamingGlobalConfigHandle used for Tests. Allows setting fake 
configs.
+ */
+public class FakeGlobalConfigHandle implements StreamingGlobalConfigHandle {
+
+  private final StreamingGlobalConfigHandleImpl globalConfigHandle;
+
+  public FakeGlobalConfigHandle(StreamingGlobalConfig config) {
+    this.globalConfigHandle = new StreamingGlobalConfigHandleImpl();
+    this.globalConfigHandle.setConfig(config);
+  }
+
+  @Override
+  public StreamingGlobalConfig getConfig() {
+    return globalConfigHandle.getConfig();
+  }
+
+  public void setConfig(StreamingGlobalConfig config) {
+    globalConfigHandle.setConfig(config);
+  }
+
+  @Override
+  public void registerConfigObserver(@Nonnull Consumer<StreamingGlobalConfig> 
callback) {
+    globalConfigHandle.registerConfigObserver(callback);
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java
index e571f89f142..fa36b11ffe5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;
 
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
 import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream;
 import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
@@ -27,19 +30,53 @@ import org.slf4j.LoggerFactory;
 /** StreamingEngine stream pool based implementation of {@link 
HeartbeatSender}. */
 @Internal
 public final class StreamPoolHeartbeatSender implements HeartbeatSender {
+
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamPoolHeartbeatSender.class);
 
-  private final WindmillStreamPool<WindmillStream.GetDataStream> 
heartbeatStreamPool;
+  @Nonnull
+  private final 
AtomicReference<WindmillStreamPool<WindmillStream.GetDataStream>>
+      heartbeatStreamPool = new AtomicReference<>();
 
-  public StreamPoolHeartbeatSender(
+  private StreamPoolHeartbeatSender(
       WindmillStreamPool<WindmillStream.GetDataStream> heartbeatStreamPool) {
-    this.heartbeatStreamPool = heartbeatStreamPool;
+    this.heartbeatStreamPool.set(heartbeatStreamPool);
+  }
+
+  public static StreamPoolHeartbeatSender Create(
+      @Nonnull WindmillStreamPool<WindmillStream.GetDataStream> 
heartbeatStreamPool) {
+    return new StreamPoolHeartbeatSender(heartbeatStreamPool);
+  }
+
+  /**
+   * Creates StreamPoolHeartbeatSender that switches between the passed in 
stream pools depending on
+   * global config.
+   *
+   * @param dedicatedHeartbeatPool stream to use when using separate streams 
for heartbeat is
+   *     enabled.
+   * @param getDataPool stream to use when using separate streams for 
heartbeat is disabled.
+   */
+  public static StreamPoolHeartbeatSender Create(
+      @Nonnull WindmillStreamPool<WindmillStream.GetDataStream> 
dedicatedHeartbeatPool,
+      @Nonnull WindmillStreamPool<WindmillStream.GetDataStream> getDataPool,
+      @Nonnull StreamingGlobalConfigHandle configHandle) {
+    // Use getDataPool as the default, settings callback will
+    // switch to the separate pool if enabled before processing any elements 
are processed.
+    StreamPoolHeartbeatSender heartbeatSender = new 
StreamPoolHeartbeatSender(getDataPool);
+    configHandle.registerConfigObserver(
+        streamingGlobalConfig ->
+            heartbeatSender.heartbeatStreamPool.set(
+                streamingGlobalConfig
+                        .userWorkerJobSettings()
+                        .getUseSeparateWindmillHeartbeatStreams()
+                    ? dedicatedHeartbeatPool
+                    : getDataPool));
+    return heartbeatSender;
   }
 
   @Override
   public void sendHeartbeats(Heartbeats heartbeats) {
     try (CloseableStream<WindmillStream.GetDataStream> closeableStream =
-        heartbeatStreamPool.getCloseableStream()) {
+        heartbeatStreamPool.get().getCloseableStream()) {
       
closeableStream.stream().refreshActiveWork(heartbeats.heartbeatRequests().asMap());
     } catch (Exception e) {
       LOG.warn("Error occurred sending heartbeats=[{}].", heartbeats, e);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java
new file mode 100644
index 00000000000..ed915088d0a
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Optional;
+import org.apache.beam.runners.dataflow.worker.FakeWindmillServer;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.FakeGlobalConfigHandle;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.rules.ErrorCollector;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamPoolHeartbeatSenderTest {
+
+  @Test
+  public void sendsHeartbeatsOnStream() {
+    FakeWindmillServer server = new FakeWindmillServer(new ErrorCollector(), c 
-> Optional.empty());
+    StreamPoolHeartbeatSender heartbeatSender =
+        StreamPoolHeartbeatSender.Create(
+            WindmillStreamPool.create(1, Duration.standardSeconds(10), 
server::getDataStream));
+    Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder();
+    heartbeatsBuilder
+        .heartbeatRequestsBuilder()
+        .put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build());
+    heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
+    assertEquals(1, server.getGetDataRequests().size());
+  }
+
+  @Test
+  public void sendsHeartbeatsOnDedicatedStream() {
+    FakeWindmillServer dedicatedServer =
+        new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());
+    FakeWindmillServer getDataServer =
+        new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());
+
+    FakeGlobalConfigHandle configHandle =
+        new 
FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ true));
+    StreamPoolHeartbeatSender heartbeatSender =
+        StreamPoolHeartbeatSender.Create(
+            WindmillStreamPool.create(
+                1, Duration.standardSeconds(10), 
dedicatedServer::getDataStream),
+            WindmillStreamPool.create(
+                1, Duration.standardSeconds(10), getDataServer::getDataStream),
+            configHandle);
+    Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder();
+    heartbeatsBuilder
+        .heartbeatRequestsBuilder()
+        .put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build());
+    heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
+    assertEquals(1, dedicatedServer.getGetDataRequests().size());
+    assertEquals(0, getDataServer.getGetDataRequests().size());
+
+    heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
+    assertEquals(2, dedicatedServer.getGetDataRequests().size());
+    assertEquals(0, getDataServer.getGetDataRequests().size());
+
+    // Turn off separate heartbeats
+    configHandle.setConfig(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ 
false));
+    heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
+    // request to getDataServer increases and dedicatedServer remains same
+    assertEquals(2, dedicatedServer.getGetDataRequests().size());
+    assertEquals(1, getDataServer.getGetDataRequests().size());
+  }
+
+  private static StreamingGlobalConfig getGlobalConfig(boolean 
useSeparateHeartbeatStreams) {
+    return StreamingGlobalConfig.builder()
+        .setUserWorkerJobSettings(
+            UserWorkerRunnerV1Settings.newBuilder()
+                
.setUseSeparateWindmillHeartbeatStreams(useSeparateHeartbeatStreams)
+                .build())
+        .build();
+  }
+
+  @Test
+  public void sendsHeartbeatsOnGetDataStream() {
+    FakeWindmillServer dedicatedServer =
+        new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());
+    FakeWindmillServer getDataServer =
+        new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());
+
+    FakeGlobalConfigHandle configHandle =
+        new 
FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ false));
+    StreamPoolHeartbeatSender heartbeatSender =
+        StreamPoolHeartbeatSender.Create(
+            WindmillStreamPool.create(
+                1, Duration.standardSeconds(10), 
dedicatedServer::getDataStream),
+            WindmillStreamPool.create(
+                1, Duration.standardSeconds(10), getDataServer::getDataStream),
+            configHandle);
+    Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder();
+    heartbeatsBuilder
+        .heartbeatRequestsBuilder()
+        .put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build());
+    heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
+    assertEquals(0, dedicatedServer.getGetDataRequests().size());
+    assertEquals(1, getDataServer.getGetDataRequests().size());
+
+    heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
+    assertEquals(0, dedicatedServer.getGetDataRequests().size());
+    assertEquals(2, getDataServer.getGetDataRequests().size());
+
+    // Turn on separate heartbeats
+    configHandle.setConfig(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ 
true));
+    heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
+    // request to dedicatedServer increases and getDataServer remains same
+    assertEquals(1, dedicatedServer.getGetDataRequests().size());
+    assertEquals(2, getDataServer.getGetDataRequests().size());
+  }
+}

Reply via email to