This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8d22fc2f72e Remove experiments guarding isolated channels enablement
based on jobsettings (#32782)
8d22fc2f72e is described below
commit 8d22fc2f72e6d0781eea465e773c542b5907686d
Author: Arun Pandian <[email protected]>
AuthorDate: Tue Oct 22 00:57:48 2024 -0700
Remove experiments guarding isolated channels enablement based on
jobsettings (#32782)
---
.../dataflow/worker/StreamingDataflowWorker.java | 7 +------
.../windmill/client/grpc/GrpcDispatcherClient.java | 19 +++++--------------
.../client/grpc/GrpcDispatcherClientTest.java | 15 +--------------
3 files changed, 7 insertions(+), 34 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index ecdba404151..52490602372 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -140,8 +140,6 @@ public final class StreamingDataflowWorker {
private static final int DEFAULT_STATUS_PORT = 8081;
private static final Random CLIENT_ID_GENERATOR = new Random();
private static final String CHANNELZ_PATH = "/channelz";
- public static final String
STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL =
- "streaming_engine_use_job_settings_for_heartbeat_pool";
private final WindmillStateCache stateCache;
private final StreamingWorkerStatusPages statusPages;
@@ -249,10 +247,7 @@ public final class StreamingDataflowWorker {
GET_DATA_STREAM_TIMEOUT,
windmillServer::getDataStream);
getDataClient = new StreamPoolGetDataClient(getDataMetricTracker,
getDataStreamPool);
- // Experiment gates the logic till backend changes are rollback safe
- if (!DataflowRunner.hasExperiment(
- options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL)
- || options.getUseSeparateWindmillHeartbeatStreams() != null) {
+ if (options.getUseSeparateWindmillHeartbeatStreams() != null) {
heartbeatSender =
StreamPoolHeartbeatSender.Create(
Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams())
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
index f96464150d4..6bae84483d1 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
@@ -30,7 +30,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
-import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
@@ -53,8 +52,6 @@ import org.slf4j.LoggerFactory;
public class GrpcDispatcherClient {
private static final Logger LOG =
LoggerFactory.getLogger(GrpcDispatcherClient.class);
- static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS =
- "streaming_engine_use_job_settings_for_isolated_channels";
private final CountDownLatch onInitializedEndpoints;
/**
@@ -80,18 +77,12 @@ public class GrpcDispatcherClient {
DispatcherStubs initialDispatcherStubs,
Random rand) {
this.windmillStubFactoryFactory = windmillStubFactoryFactory;
- if (DataflowRunner.hasExperiment(
- options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS)) {
- if (options.getUseWindmillIsolatedChannels() != null) {
- this.useIsolatedChannels.set(options.getUseWindmillIsolatedChannels());
- this.reactToIsolatedChannelsJobSetting = false;
- } else {
- this.useIsolatedChannels.set(false);
- this.reactToIsolatedChannelsJobSetting = true;
- }
- } else {
-
this.useIsolatedChannels.set(Boolean.TRUE.equals(options.getUseWindmillIsolatedChannels()));
+ if (options.getUseWindmillIsolatedChannels() != null) {
+ this.useIsolatedChannels.set(options.getUseWindmillIsolatedChannels());
this.reactToIsolatedChannelsJobSetting = false;
+ } else {
+ this.useIsolatedChannels.set(false);
+ this.reactToIsolatedChannelsJobSetting = true;
}
this.windmillStubFactory.set(
windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels.get()));
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
index 3f746d91a86..c04456906ea 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
@@ -34,7 +34,6 @@ import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.Isolat
import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
import org.hamcrest.Matcher;
import org.junit.Test;
@@ -55,9 +54,6 @@ public class GrpcDispatcherClientTest {
public void createsNewStubWhenIsolatedChannelsConfigIsChanged() {
DataflowWorkerHarnessOptions options =
PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
- options.setExperiments(
- Lists.newArrayList(
-
GrpcDispatcherClient.STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS));
GrpcDispatcherClient dispatcherClient =
GrpcDispatcherClient.create(options, new
WindmillStubFactoryFactoryImpl(options));
// Create first time with Isolated channels disabled
@@ -91,27 +87,18 @@ public class GrpcDispatcherClientTest {
public static Collection<Object[]> data() {
List<Object[]> list = new ArrayList<>();
for (Boolean pipelineOption : new Boolean[] {true, false}) {
- list.add(new Object[] {/*experimentEnabled=*/ false, pipelineOption});
- list.add(new Object[] {/*experimentEnabled=*/ true, pipelineOption});
+ list.add(new Object[] {pipelineOption});
}
return list;
}
@Parameter(0)
- public Boolean experimentEnabled;
-
- @Parameter(1)
public Boolean pipelineOption;
@Test
public void ignoresIsolatedChannelsConfigWithPipelineOption() {
DataflowWorkerHarnessOptions options =
PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
- if (experimentEnabled) {
- options.setExperiments(
- Lists.newArrayList(
-
GrpcDispatcherClient.STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS));
- }
options.setUseWindmillIsolatedChannels(pipelineOption);
GrpcDispatcherClient dispatcherClient =
GrpcDispatcherClient.create(options, new
WindmillStubFactoryFactoryImpl(options));