This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c7fb9a0d5da [Dataflow Streaming] Use separate heartbeat streams based
on job settings (#32511)
c7fb9a0d5da is described below
commit c7fb9a0d5da79018836c266355afbb33d92bb983
Author: Arun Pandian <[email protected]>
AuthorDate: Thu Sep 26 02:39:33 2024 -0700
[Dataflow Streaming] Use separate heartbeat streams based on job settings
(#32511)
---
.../options/DataflowStreamingPipelineOptions.java | 5 +-
.../dataflow/worker/StreamingDataflowWorker.java | 32 ++++-
.../streaming/config/FakeGlobalConfigHandle.java | 52 ++++++++
.../work/refresh/StreamPoolHeartbeatSender.java | 45 ++++++-
.../refresh/StreamPoolHeartbeatSenderTest.java | 132 +++++++++++++++++++++
5 files changed, 253 insertions(+), 13 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
index a761d38de1a..10df6e24f49 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
@@ -132,10 +132,9 @@ public interface DataflowStreamingPipelineOptions extends
PipelineOptions {
@Description(
"If true, separate streaming rpcs will be used for heartbeats instead of
sharing streams with state reads.")
- @Default.Boolean(false)
- boolean getUseSeparateWindmillHeartbeatStreams();
+ Boolean getUseSeparateWindmillHeartbeatStreams();
- void setUseSeparateWindmillHeartbeatStreams(boolean value);
+ void setUseSeparateWindmillHeartbeatStreams(Boolean value);
@Description("The number of streams to use for GetData requests.")
@Default.Integer(1)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 0dedd4f34fd..8b440c306f0 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -146,6 +146,8 @@ public final class StreamingDataflowWorker {
private static final int DEFAULT_STATUS_PORT = 8081;
private static final Random CLIENT_ID_GENERATOR = new Random();
private static final String CHANNELZ_PATH = "/channelz";
+ public static final String
STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL =
+ "streaming_engine_use_job_settings_for_heartbeat_pool";
private final WindmillStateCache stateCache;
private final StreamingWorkerStatusPages statusPages;
@@ -253,12 +255,24 @@ public final class StreamingDataflowWorker {
GET_DATA_STREAM_TIMEOUT,
windmillServer::getDataStream);
getDataClient = new StreamPoolGetDataClient(getDataMetricTracker,
getDataStreamPool);
- heartbeatSender =
- new StreamPoolHeartbeatSender(
- options.getUseSeparateWindmillHeartbeatStreams()
- ? WindmillStreamPool.create(
- 1, GET_DATA_STREAM_TIMEOUT,
windmillServer::getDataStream)
- : getDataStreamPool);
+ // Experiment gates the logic till backend changes are rollback safe
+ if (!DataflowRunner.hasExperiment(
+ options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL)
+ || options.getUseSeparateWindmillHeartbeatStreams() != null) {
+ heartbeatSender =
+ StreamPoolHeartbeatSender.Create(
+
Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams())
+ ? separateHeartbeatPool(windmillServer)
+ : getDataStreamPool);
+
+ } else {
+ heartbeatSender =
+ StreamPoolHeartbeatSender.Create(
+ separateHeartbeatPool(windmillServer),
+ getDataStreamPool,
+ configFetcher.getGlobalConfigHandle());
+ }
+
stuckCommitDurationMillis =
options.getStuckCommitDurationMillis() > 0 ?
options.getStuckCommitDurationMillis() : 0;
statusPagesBuilder
@@ -326,6 +340,11 @@ public final class StreamingDataflowWorker {
LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport());
}
+ private static WindmillStreamPool<GetDataStream> separateHeartbeatPool(
+ WindmillServerStub windmillServer) {
+ return WindmillStreamPool.create(1, GET_DATA_STREAM_TIMEOUT,
windmillServer::getDataStream);
+ }
+
public static StreamingDataflowWorker
fromOptions(DataflowWorkerHarnessOptions options) {
long clientId = CLIENT_ID_GENERATOR.nextLong();
MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options);
@@ -834,6 +853,7 @@ public final class StreamingDataflowWorker {
*/
@AutoValue
abstract static class BackgroundMemoryMonitor {
+
private static BackgroundMemoryMonitor create(MemoryMonitor memoryMonitor)
{
return new AutoValue_StreamingDataflowWorker_BackgroundMemoryMonitor(
memoryMonitor,
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FakeGlobalConfigHandle.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FakeGlobalConfigHandle.java
new file mode 100644
index 00000000000..d4d73f5882b
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/FakeGlobalConfigHandle.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.config;
+
+import java.util.function.Consumer;
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.sdk.annotations.Internal;
+
+@Internal
+@ThreadSafe
+/*
+ * Fake StreamingGlobalConfigHandle used for Tests. Allows setting fake
configs.
+ */
+public class FakeGlobalConfigHandle implements StreamingGlobalConfigHandle {
+
+ private final StreamingGlobalConfigHandleImpl globalConfigHandle;
+
+ public FakeGlobalConfigHandle(StreamingGlobalConfig config) {
+ this.globalConfigHandle = new StreamingGlobalConfigHandleImpl();
+ this.globalConfigHandle.setConfig(config);
+ }
+
+ @Override
+ public StreamingGlobalConfig getConfig() {
+ return globalConfigHandle.getConfig();
+ }
+
+ public void setConfig(StreamingGlobalConfig config) {
+ globalConfigHandle.setConfig(config);
+ }
+
+ @Override
+ public void registerConfigObserver(@Nonnull Consumer<StreamingGlobalConfig>
callback) {
+ globalConfigHandle.registerConfigObserver(callback);
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java
index e571f89f142..fa36b11ffe5 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSender.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nonnull;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
@@ -27,19 +30,53 @@ import org.slf4j.LoggerFactory;
/** StreamingEngine stream pool based implementation of {@link
HeartbeatSender}. */
@Internal
public final class StreamPoolHeartbeatSender implements HeartbeatSender {
+
private static final Logger LOG =
LoggerFactory.getLogger(StreamPoolHeartbeatSender.class);
- private final WindmillStreamPool<WindmillStream.GetDataStream>
heartbeatStreamPool;
+ @Nonnull
+ private final
AtomicReference<WindmillStreamPool<WindmillStream.GetDataStream>>
+ heartbeatStreamPool = new AtomicReference<>();
- public StreamPoolHeartbeatSender(
+ private StreamPoolHeartbeatSender(
WindmillStreamPool<WindmillStream.GetDataStream> heartbeatStreamPool) {
- this.heartbeatStreamPool = heartbeatStreamPool;
+ this.heartbeatStreamPool.set(heartbeatStreamPool);
+ }
+
+ public static StreamPoolHeartbeatSender Create(
+ @Nonnull WindmillStreamPool<WindmillStream.GetDataStream>
heartbeatStreamPool) {
+ return new StreamPoolHeartbeatSender(heartbeatStreamPool);
+ }
+
+ /**
+ * Creates StreamPoolHeartbeatSender that switches between the passed in
stream pools depending on
+ * global config.
+ *
+ * @param dedicatedHeartbeatPool stream to use when using separate streams
for heartbeat is
+ * enabled.
+ * @param getDataPool stream to use when using separate streams for
heartbeat is disabled.
+ */
+ public static StreamPoolHeartbeatSender Create(
+ @Nonnull WindmillStreamPool<WindmillStream.GetDataStream>
dedicatedHeartbeatPool,
+ @Nonnull WindmillStreamPool<WindmillStream.GetDataStream> getDataPool,
+ @Nonnull StreamingGlobalConfigHandle configHandle) {
+ // Use getDataPool as the default, settings callback will
+ // switch to the separate pool if enabled before processing any elements
are processed.
+ StreamPoolHeartbeatSender heartbeatSender = new
StreamPoolHeartbeatSender(getDataPool);
+ configHandle.registerConfigObserver(
+ streamingGlobalConfig ->
+ heartbeatSender.heartbeatStreamPool.set(
+ streamingGlobalConfig
+ .userWorkerJobSettings()
+ .getUseSeparateWindmillHeartbeatStreams()
+ ? dedicatedHeartbeatPool
+ : getDataPool));
+ return heartbeatSender;
}
@Override
public void sendHeartbeats(Heartbeats heartbeats) {
try (CloseableStream<WindmillStream.GetDataStream> closeableStream =
- heartbeatStreamPool.getCloseableStream()) {
+ heartbeatStreamPool.get().getCloseableStream()) {
closeableStream.stream().refreshActiveWork(heartbeats.heartbeatRequests().asMap());
} catch (Exception e) {
LOG.warn("Error occurred sending heartbeats=[{}].", heartbeats, e);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java
new file mode 100644
index 00000000000..ed915088d0a
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/StreamPoolHeartbeatSenderTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Optional;
+import org.apache.beam.runners.dataflow.worker.FakeWindmillServer;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.FakeGlobalConfigHandle;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.rules.ErrorCollector;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamPoolHeartbeatSenderTest {
+
+ @Test
+ public void sendsHeartbeatsOnStream() {
+ FakeWindmillServer server = new FakeWindmillServer(new ErrorCollector(), c
-> Optional.empty());
+ StreamPoolHeartbeatSender heartbeatSender =
+ StreamPoolHeartbeatSender.Create(
+ WindmillStreamPool.create(1, Duration.standardSeconds(10),
server::getDataStream));
+ Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder();
+ heartbeatsBuilder
+ .heartbeatRequestsBuilder()
+ .put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build());
+ heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
+ assertEquals(1, server.getGetDataRequests().size());
+ }
+
+ @Test
+ public void sendsHeartbeatsOnDedicatedStream() {
+ FakeWindmillServer dedicatedServer =
+ new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());
+ FakeWindmillServer getDataServer =
+ new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());
+
+ FakeGlobalConfigHandle configHandle =
+ new
FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ true));
+ StreamPoolHeartbeatSender heartbeatSender =
+ StreamPoolHeartbeatSender.Create(
+ WindmillStreamPool.create(
+ 1, Duration.standardSeconds(10),
dedicatedServer::getDataStream),
+ WindmillStreamPool.create(
+ 1, Duration.standardSeconds(10), getDataServer::getDataStream),
+ configHandle);
+ Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder();
+ heartbeatsBuilder
+ .heartbeatRequestsBuilder()
+ .put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build());
+ heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
+ assertEquals(1, dedicatedServer.getGetDataRequests().size());
+ assertEquals(0, getDataServer.getGetDataRequests().size());
+
+ heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
+ assertEquals(2, dedicatedServer.getGetDataRequests().size());
+ assertEquals(0, getDataServer.getGetDataRequests().size());
+
+ // Turn off separate heartbeats
+ configHandle.setConfig(getGlobalConfig(/*useSeparateHeartbeatStreams=*/
false));
+ heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
+ // request to getDataServer increases and dedicatedServer remains same
+ assertEquals(2, dedicatedServer.getGetDataRequests().size());
+ assertEquals(1, getDataServer.getGetDataRequests().size());
+ }
+
+ private static StreamingGlobalConfig getGlobalConfig(boolean
useSeparateHeartbeatStreams) {
+ return StreamingGlobalConfig.builder()
+ .setUserWorkerJobSettings(
+ UserWorkerRunnerV1Settings.newBuilder()
+
.setUseSeparateWindmillHeartbeatStreams(useSeparateHeartbeatStreams)
+ .build())
+ .build();
+ }
+
+ @Test
+ public void sendsHeartbeatsOnGetDataStream() {
+ FakeWindmillServer dedicatedServer =
+ new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());
+ FakeWindmillServer getDataServer =
+ new FakeWindmillServer(new ErrorCollector(), c -> Optional.empty());
+
+ FakeGlobalConfigHandle configHandle =
+ new
FakeGlobalConfigHandle(getGlobalConfig(/*useSeparateHeartbeatStreams=*/ false));
+ StreamPoolHeartbeatSender heartbeatSender =
+ StreamPoolHeartbeatSender.Create(
+ WindmillStreamPool.create(
+ 1, Duration.standardSeconds(10),
dedicatedServer::getDataStream),
+ WindmillStreamPool.create(
+ 1, Duration.standardSeconds(10), getDataServer::getDataStream),
+ configHandle);
+ Heartbeats.Builder heartbeatsBuilder = Heartbeats.builder();
+ heartbeatsBuilder
+ .heartbeatRequestsBuilder()
+ .put("key", HeartbeatRequest.newBuilder().setWorkToken(123).build());
+ heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
+ assertEquals(0, dedicatedServer.getGetDataRequests().size());
+ assertEquals(1, getDataServer.getGetDataRequests().size());
+
+ heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
+ assertEquals(0, dedicatedServer.getGetDataRequests().size());
+ assertEquals(2, getDataServer.getGetDataRequests().size());
+
+ // Turn on separate heartbeats
+ configHandle.setConfig(getGlobalConfig(/*useSeparateHeartbeatStreams=*/
true));
+ heartbeatSender.sendHeartbeats(heartbeatsBuilder.build());
+ // request to dedicatedServer increases and getDataServer remains same
+ assertEquals(1, dedicatedServer.getGetDataRequests().size());
+ assertEquals(2, getDataServer.getGetDataRequests().size());
+ }
+}