This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ef5060416d9 [Dataflow Streaming] Enabled Heartbeat by Default (#31689)
ef5060416d9 is described below
commit ef5060416d9fed2e08a6682e69657c6fa9f98af4
Author: TongruiLi <[email protected]>
AuthorDate: Mon Jul 1 06:04:18 2024 -0700
[Dataflow Streaming] Enabled Heartbeat by Default (#31689)
---
.../dataflow/worker/StreamingDataflowWorker.java | 4 +--
.../windmill/client/grpc/GrpcWindmillServer.java | 4 +--
.../client/grpc/GrpcWindmillServerTest.java | 37 +++++++++-------------
3 files changed, 19 insertions(+), 26 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index fc1be2cd137..59819db88a0 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -602,8 +602,8 @@ public class StreamingDataflowWorker {
.setStreamingRpcBatchLimit(options.getWindmillServiceStreamingRpcBatchLimit())
.setSendKeyedGetDataRequests(
!options.isEnableStreamingEngine()
- || !DataflowRunner.hasExperiment(
- options, "streaming_engine_send_new_heartbeat_requests"));
+ || DataflowRunner.hasExperiment(
+ options,
"streaming_engine_disable_new_heartbeat_requests"));
}
private static BoundedQueueExecutor
createWorkUnitExecutor(DataflowWorkerHarnessOptions options) {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
index abf85d98548..0ab03a80318 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
@@ -176,8 +176,8 @@ public final class GrpcWindmillServer extends
WindmillServerStub {
testOptions(/* enableStreamingEngine= */ true, experiments);
boolean sendKeyedGetDataRequests =
!testOptions.isEnableStreamingEngine()
- || !DataflowRunner.hasExperiment(
- testOptions, "streaming_engine_send_new_heartbeat_requests");
+ || DataflowRunner.hasExperiment(
+ testOptions,
"streaming_engine_disable_new_heartbeat_requests");
GrpcWindmillStreamFactory windmillStreamFactory =
GrpcWindmillStreamFactory.of(createJobHeader(testOptions, clientId))
.setSendKeyedGetDataRequests(sendKeyedGetDataRequests)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
index b1d5309e12d..6473d5527a8 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
@@ -124,8 +124,16 @@ public class GrpcWindmillServerTest {
@Before
public void setUp() throws Exception {
- String name = "Fake server for " + getClass();
+ startServerAndClient(new ArrayList<>());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ server.shutdownNow();
+ }
+ private void startServerAndClient(List<String> experiments) throws Exception
{
+ String name = "Fake server for " + getClass();
this.server =
InProcessServerBuilder.forName(name)
.fallbackHandlerRegistry(serviceRegistry)
@@ -136,17 +144,12 @@ public class GrpcWindmillServerTest {
this.client =
GrpcWindmillServer.newTestInstance(
name,
- new ArrayList<>(),
+ experiments,
clientId,
new FakeWindmillStubFactory(
() ->
grpcCleanup.register(WindmillChannelFactory.inProcessChannel(name))));
}
- @After
- public void tearDown() throws Exception {
- server.shutdownNow();
- }
-
private <Stream extends StreamObserver> void maybeInjectError(Stream stream)
{
if (remainingErrors > 0 && ThreadLocalRandom.current().nextInt(20) == 0) {
try {
@@ -880,6 +883,11 @@ public class GrpcWindmillServerTest {
public void testStreamingGetDataHeartbeatsAsKeyedGetDataRequests() throws
Exception {
// This server records the heartbeats observed but doesn't respond.
final Map<String, List<KeyedGetDataRequest>> getDataHeartbeats = new
HashMap<>();
+ // Create a client and server different from the one in SetUp so we can
add an experiment to the
+ // options passed in. This requires teardown and re-constructing the
client and server
+ tearDown();
+ startServerAndClient(
+
Collections.singletonList("streaming_engine_disable_new_heartbeat_requests"));
serviceRegistry.addService(
new CloudWindmillServiceV1Alpha1ImplBase() {
@@ -973,21 +981,6 @@ public class GrpcWindmillServerTest {
@Test
public void testStreamingGetDataHeartbeatsAsHeartbeatRequests() throws
Exception {
- // Create a client and server different from the one in SetUp so we can
add an experiment to the
- // options passed in.
- this.server =
- InProcessServerBuilder.forName("TestServer")
- .fallbackHandlerRegistry(serviceRegistry)
- .executor(Executors.newFixedThreadPool(1))
- .build()
- .start();
- this.client =
- GrpcWindmillServer.newTestInstance(
- "TestServer",
-
Collections.singletonList("streaming_engine_send_new_heartbeat_requests"),
- clientId,
- new FakeWindmillStubFactory(
- () -> WindmillChannelFactory.inProcessChannel("TestServer")));
// This server records the heartbeats observed but doesn't respond.
final List<ComputationHeartbeatRequest> receivedHeartbeats = new
ArrayList<>();