This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d22fc2f72e Remove experiments guarding isolated channels enablement 
based on jobsettings (#32782)
8d22fc2f72e is described below

commit 8d22fc2f72e6d0781eea465e773c542b5907686d
Author: Arun Pandian <[email protected]>
AuthorDate: Tue Oct 22 00:57:48 2024 -0700

    Remove experiments guarding isolated channels enablement based on 
jobsettings (#32782)
---
 .../dataflow/worker/StreamingDataflowWorker.java      |  7 +------
 .../windmill/client/grpc/GrpcDispatcherClient.java    | 19 +++++--------------
 .../client/grpc/GrpcDispatcherClientTest.java         | 15 +--------------
 3 files changed, 7 insertions(+), 34 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index ecdba404151..52490602372 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -140,8 +140,6 @@ public final class StreamingDataflowWorker {
   private static final int DEFAULT_STATUS_PORT = 8081;
   private static final Random CLIENT_ID_GENERATOR = new Random();
   private static final String CHANNELZ_PATH = "/channelz";
-  public static final String 
STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL =
-      "streaming_engine_use_job_settings_for_heartbeat_pool";
 
   private final WindmillStateCache stateCache;
   private final StreamingWorkerStatusPages statusPages;
@@ -249,10 +247,7 @@ public final class StreamingDataflowWorker {
               GET_DATA_STREAM_TIMEOUT,
               windmillServer::getDataStream);
       getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, 
getDataStreamPool);
-      // Experiment gates the logic till backend changes are rollback safe
-      if (!DataflowRunner.hasExperiment(
-              options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL)
-          || options.getUseSeparateWindmillHeartbeatStreams() != null) {
+      if (options.getUseSeparateWindmillHeartbeatStreams() != null) {
         heartbeatSender =
             StreamPoolHeartbeatSender.Create(
                 
Boolean.TRUE.equals(options.getUseSeparateWindmillHeartbeatStreams())
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
index f96464150d4..6bae84483d1 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
@@ -30,7 +30,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
-import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
@@ -53,8 +52,6 @@ import org.slf4j.LoggerFactory;
 public class GrpcDispatcherClient {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcDispatcherClient.class);
-  static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS =
-      "streaming_engine_use_job_settings_for_isolated_channels";
   private final CountDownLatch onInitializedEndpoints;
 
   /**
@@ -80,18 +77,12 @@ public class GrpcDispatcherClient {
       DispatcherStubs initialDispatcherStubs,
       Random rand) {
     this.windmillStubFactoryFactory = windmillStubFactoryFactory;
-    if (DataflowRunner.hasExperiment(
-        options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS)) {
-      if (options.getUseWindmillIsolatedChannels() != null) {
-        this.useIsolatedChannels.set(options.getUseWindmillIsolatedChannels());
-        this.reactToIsolatedChannelsJobSetting = false;
-      } else {
-        this.useIsolatedChannels.set(false);
-        this.reactToIsolatedChannelsJobSetting = true;
-      }
-    } else {
-      
this.useIsolatedChannels.set(Boolean.TRUE.equals(options.getUseWindmillIsolatedChannels()));
+    if (options.getUseWindmillIsolatedChannels() != null) {
+      this.useIsolatedChannels.set(options.getUseWindmillIsolatedChannels());
       this.reactToIsolatedChannelsJobSetting = false;
+    } else {
+      this.useIsolatedChannels.set(false);
+      this.reactToIsolatedChannelsJobSetting = true;
     }
     this.windmillStubFactory.set(
         
windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels.get()));
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
index 3f746d91a86..c04456906ea 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
@@ -34,7 +34,6 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.Isolat
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
 import org.hamcrest.Matcher;
 import org.junit.Test;
@@ -55,9 +54,6 @@ public class GrpcDispatcherClientTest {
     public void createsNewStubWhenIsolatedChannelsConfigIsChanged() {
       DataflowWorkerHarnessOptions options =
           PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
-      options.setExperiments(
-          Lists.newArrayList(
-              
GrpcDispatcherClient.STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS));
       GrpcDispatcherClient dispatcherClient =
           GrpcDispatcherClient.create(options, new 
WindmillStubFactoryFactoryImpl(options));
       // Create first time with Isolated channels disabled
@@ -91,27 +87,18 @@ public class GrpcDispatcherClientTest {
     public static Collection<Object[]> data() {
       List<Object[]> list = new ArrayList<>();
       for (Boolean pipelineOption : new Boolean[] {true, false}) {
-        list.add(new Object[] {/*experimentEnabled=*/ false, pipelineOption});
-        list.add(new Object[] {/*experimentEnabled=*/ true, pipelineOption});
+        list.add(new Object[] {pipelineOption});
       }
       return list;
     }
 
     @Parameter(0)
-    public Boolean experimentEnabled;
-
-    @Parameter(1)
     public Boolean pipelineOption;
 
     @Test
     public void ignoresIsolatedChannelsConfigWithPipelineOption() {
       DataflowWorkerHarnessOptions options =
           PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
-      if (experimentEnabled) {
-        options.setExperiments(
-            Lists.newArrayList(
-                
GrpcDispatcherClient.STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS));
-      }
       options.setUseWindmillIsolatedChannels(pipelineOption);
       GrpcDispatcherClient dispatcherClient =
           GrpcDispatcherClient.create(options, new 
WindmillStubFactoryFactoryImpl(options));

Reply via email to