This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 02d8f56a1e8 [Dataflow Streaming] Remove rolled out Windmill isolated 
channels flag (#37844)
02d8f56a1e8 is described below

commit 02d8f56a1e876d135ae157b3b4aa2ceed2be16c0
Author: Arun Pandian <[email protected]>
AuthorDate: Fri Mar 13 01:53:09 2026 -0700

    [Dataflow Streaming] Remove rolled out Windmill isolated channels flag 
(#37844)
---
 .../options/DataflowStreamingPipelineOptions.java  |   4 +-
 .../dataflow/worker/StreamingDataflowWorker.java   |   6 +-
 .../windmill/client/grpc/GrpcDispatcherClient.java |  48 +------
 .../windmill/client/grpc/GrpcWindmillServer.java   |   3 +-
 .../grpc/stubs/WindmillStubFactoryFactory.java     |   2 +-
 .../grpc/stubs/WindmillStubFactoryFactoryImpl.java |  19 +--
 .../FanOutStreamingEngineWorkerHarnessTest.java    |   3 -
 .../client/grpc/GrpcDispatcherClientTest.java      | 141 ---------------------
 .../testing/FakeWindmillStubFactoryFactory.java    |   2 +-
 .../worker/windmill/src/main/proto/windmill.proto  |   5 +-
 10 files changed, 25 insertions(+), 208 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
index 9cc98276f2d..2e0e7efd73a 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
@@ -126,7 +126,9 @@ public interface DataflowStreamingPipelineOptions extends 
PipelineOptions {
 
   void setWindmillMessagesBetweenIsReadyChecks(int value);
 
-  @Description("If true, a most a single active rpc will be used per channel.")
+  /** @deprecated since 2.73.0 */
+  @Deprecated
+  @Description("Unused flag.")
   Boolean getUseWindmillIsolatedChannels();
 
   void setUseWindmillIsolatedChannels(Boolean value);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 98f596141ee..1b13b72986f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -725,7 +725,7 @@ public final class StreamingDataflowWorker {
           Function<ComputationConfig.Fetcher, ComputationStateCache> 
computationStateCacheFactory) {
     if (options.isEnableStreamingEngine()) {
       GrpcDispatcherClient dispatcherClient =
-          GrpcDispatcherClient.create(options, new 
WindmillStubFactoryFactoryImpl(options));
+          GrpcDispatcherClient.create(new 
WindmillStubFactoryFactoryImpl(options));
       ComputationConfig.Fetcher configFetcher =
           StreamingEngineComputationConfigFetcher.create(
               options.getGlobalConfigRefreshPeriod().getMillis(), 
dataflowServiceClient);
@@ -753,7 +753,7 @@ public final class StreamingDataflowWorker {
     if (options.getWindmillServiceEndpoint() != null
         || options.getLocalWindmillHostport().startsWith("grpc:")) {
       GrpcDispatcherClient dispatcherClient =
-          GrpcDispatcherClient.create(options, new 
WindmillStubFactoryFactoryImpl(options));
+          GrpcDispatcherClient.create(new 
WindmillStubFactoryFactoryImpl(options));
       GrpcWindmillStreamFactory windmillStreamFactory =
           windmillStreamFactoryBuilder
               .setHealthCheckIntervalMillis(
@@ -920,7 +920,7 @@ public final class StreamingDataflowWorker {
         createGrpcwindmillStreamFactoryBuilder(options, 1)
             .setProcessHeartbeatResponses(
                 new 
WorkHeartbeatResponseProcessor(computationStateCache::get));
-    GrpcDispatcherClient grpcDispatcherClient = 
GrpcDispatcherClient.create(options, stubFactory);
+    GrpcDispatcherClient grpcDispatcherClient = 
GrpcDispatcherClient.create(stubFactory);
     grpcDispatcherClient.consumeWindmillDispatcherEndpoints(
         ImmutableSet.<HostAndPort>builder()
             .add(HostAndPort.fromHost("StreamingDataflowWorkerTest"))
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
index 82e66c4b0d7..3a0773af7fa 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
@@ -28,11 +28,9 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
-import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc.CloudWindmillMetadataServiceV1Alpha1Stub;
@@ -66,42 +64,25 @@ public class GrpcDispatcherClient {
   @GuardedBy("this")
   private final Random rand;
 
-  private final WindmillStubFactoryFactory windmillStubFactoryFactory;
-
-  private final AtomicReference<WindmillStubFactory> windmillStubFactory = new 
AtomicReference<>();
-
-  private final AtomicBoolean useIsolatedChannels = new AtomicBoolean();
-  private final boolean reactToIsolatedChannelsJobSetting;
+  private final WindmillStubFactory windmillStubFactory;
 
   private GrpcDispatcherClient(
-      DataflowWorkerHarnessOptions options,
       WindmillStubFactoryFactory windmillStubFactoryFactory,
       DispatcherStubs initialDispatcherStubs,
       Random rand) {
-    this.windmillStubFactoryFactory = windmillStubFactoryFactory;
-    if (options.getUseWindmillIsolatedChannels() != null) {
-      this.useIsolatedChannels.set(options.getUseWindmillIsolatedChannels());
-      this.reactToIsolatedChannelsJobSetting = false;
-    } else {
-      this.useIsolatedChannels.set(false);
-      this.reactToIsolatedChannelsJobSetting = true;
-    }
-    this.windmillStubFactory.set(
-        
windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels.get()));
+    this.windmillStubFactory = 
windmillStubFactoryFactory.makeWindmillStubFactory();
     this.rand = rand;
     this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
     this.onInitializedEndpoints = new CountDownLatch(1);
   }
 
-  public static GrpcDispatcherClient create(
-      DataflowWorkerHarnessOptions options, WindmillStubFactoryFactory 
windmillStubFactoryFactory) {
+  public static GrpcDispatcherClient create(WindmillStubFactoryFactory 
windmillStubFactoryFactory) {
     return new GrpcDispatcherClient(
-        options, windmillStubFactoryFactory, DispatcherStubs.empty(), new 
Random());
+        windmillStubFactoryFactory, DispatcherStubs.empty(), new Random());
   }
 
   @VisibleForTesting
   public static GrpcDispatcherClient forTesting(
-      DataflowWorkerHarnessOptions options,
       WindmillStubFactoryFactory windmillStubFactoryFactory,
       List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
       List<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs,
@@ -110,7 +91,6 @@ public class GrpcDispatcherClient {
         dispatcherEndpoints.size() == windmillServiceStubs.size()
             && windmillServiceStubs.size() == 
windmillMetadataServiceStubs.size());
     return new GrpcDispatcherClient(
-        options,
         windmillStubFactoryFactory,
         DispatcherStubs.create(
             dispatcherEndpoints, windmillServiceStubs, 
windmillMetadataServiceStubs),
@@ -172,31 +152,17 @@ public class GrpcDispatcherClient {
       LOG.warn("Dispatcher client received empty windmill service endpoints 
from global config");
       return;
     }
-    boolean forceRecreateStubs = false;
-    if (reactToIsolatedChannelsJobSetting) {
-      boolean useIsolatedChannels = 
config.userWorkerJobSettings().getUseWindmillIsolatedChannels();
-      if (this.useIsolatedChannels.getAndSet(useIsolatedChannels) != 
useIsolatedChannels) {
-        windmillStubFactory.set(
-            
windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels));
-        forceRecreateStubs = true;
-      }
-    }
-    consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints(), 
forceRecreateStubs);
+    consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints());
   }
 
   public synchronized void consumeWindmillDispatcherEndpoints(
       ImmutableSet<HostAndPort> dispatcherEndpoints) {
-    consumeWindmillDispatcherEndpoints(dispatcherEndpoints, /* 
forceRecreateStubs= */ false);
-  }
-
-  private synchronized void consumeWindmillDispatcherEndpoints(
-      ImmutableSet<HostAndPort> dispatcherEndpoints, boolean 
forceRecreateStubs) {
     ImmutableSet<HostAndPort> currentDispatcherEndpoints =
         dispatcherStubs.get().dispatcherEndpoints();
     Preconditions.checkArgument(
         dispatcherEndpoints != null && !dispatcherEndpoints.isEmpty(),
         "Cannot set dispatcher endpoints to nothing.");
-    if (!forceRecreateStubs && 
currentDispatcherEndpoints.equals(dispatcherEndpoints)) {
+    if (currentDispatcherEndpoints.equals(dispatcherEndpoints)) {
       // The endpoints are equal don't recreate the stubs.
       return;
     }
@@ -207,7 +173,7 @@ public class GrpcDispatcherClient {
     }
 
     LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", 
dispatcherEndpoints);
-    dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints, 
windmillStubFactory.get()));
+    dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints, 
windmillStubFactory));
     onInitializedEndpoints.countDown();
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
index 1cd2d2f5315..4d5acdc8071 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
@@ -166,7 +166,6 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
     Set<HostAndPort> dispatcherEndpoints = 
Sets.newHashSet(HostAndPort.fromHost(name));
     GrpcDispatcherClient dispatcherClient =
         GrpcDispatcherClient.forTesting(
-            testOptions,
             windmillStubFactoryFactory,
             windmillServiceStubs,
             windmillMetadataServiceStubs,
@@ -198,7 +197,7 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
             options,
             GrpcWindmillStreamFactory.of(createJobHeader(options, 1)).build(),
             // No-op, Appliance does not use Dispatcher to call Streaming 
Engine.
-            GrpcDispatcherClient.create(options, windmillStubFactoryFactory));
+            GrpcDispatcherClient.create(windmillStubFactoryFactory));
     testServer.syncApplianceStub = 
createWindmillApplianceStubWithDeadlineInterceptor(channel);
     return testServer;
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java
index f7dd9a22b99..8daff4faceb 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java
@@ -21,5 +21,5 @@ import org.apache.beam.sdk.annotations.Internal;
 
 @Internal
 public interface WindmillStubFactoryFactory {
-  WindmillStubFactory makeWindmillStubFactory(boolean useIsolatedChannels);
+  WindmillStubFactory makeWindmillStubFactory();
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java
index 6de6b633753..00dfb155c29 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java
@@ -33,24 +33,19 @@ public class WindmillStubFactoryFactoryImpl implements 
WindmillStubFactoryFactor
   }
 
   @Override
-  public WindmillStubFactory makeWindmillStubFactory(boolean 
useIsolatedChannels) {
+  public WindmillStubFactory makeWindmillStubFactory() {
     ChannelCache channelCache =
         ChannelCache.create(
             (flowControlSettings, serviceAddress) ->
                 // IsolationChannel will create and manage separate RPC 
channels to the same
                 // serviceAddress via calling the channelFactory, else just 
directly return the
                 // RPC channel.
-                useIsolatedChannels
-                    ? IsolationChannel.create(
-                        () ->
-                            remoteChannel(
-                                serviceAddress.getServiceAddress(),
-                                windmillServiceRpcChannelAliveTimeoutSec,
-                                flowControlSettings))
-                    : remoteChannel(
-                        serviceAddress.getServiceAddress(),
-                        windmillServiceRpcChannelAliveTimeoutSec,
-                        flowControlSettings));
+                IsolationChannel.create(
+                    () ->
+                        remoteChannel(
+                            serviceAddress.getServiceAddress(),
+                            windmillServiceRpcChannelAliveTimeoutSec,
+                            flowControlSettings)));
     return ChannelCachingRemoteStubFactory.create(gcpCredential, channelCache);
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
index 94c8f4b7595..441319b66a8 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
@@ -36,7 +36,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
@@ -58,7 +57,6 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemScheduler;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetSpender;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Server;
 import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessServerBuilder;
 import 
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessSocketAddress;
@@ -112,7 +110,6 @@ public class FanOutStreamingEngineWorkerHarnessTest {
           () -> 
grpcCleanup.register(WindmillChannels.inProcessChannel(CHANNEL_NAME)));
   private final GrpcDispatcherClient dispatcherClient =
       GrpcDispatcherClient.forTesting(
-          PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class),
           new FakeWindmillStubFactoryFactory(stubFactory),
           new ArrayList<>(),
           new ArrayList<>(),
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
deleted file mode 100644
index c04456906ea..00000000000
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
-import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
-import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
-import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
-import org.hamcrest.Matcher;
-import org.junit.Test;
-import org.junit.experimental.runners.Enclosed;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(Enclosed.class)
-public class GrpcDispatcherClientTest {
-
-  @RunWith(JUnit4.class)
-  public static class RespectsJobSettingTest {
-
-    @Test
-    public void createsNewStubWhenIsolatedChannelsConfigIsChanged() {
-      DataflowWorkerHarnessOptions options =
-          PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
-      GrpcDispatcherClient dispatcherClient =
-          GrpcDispatcherClient.create(options, new 
WindmillStubFactoryFactoryImpl(options));
-      // Create first time with Isolated channels disabled
-      
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ 
false));
-      CloudWindmillServiceV1Alpha1Stub stub1 = 
dispatcherClient.getWindmillServiceStub();
-      CloudWindmillServiceV1Alpha1Stub stub2 = 
dispatcherClient.getWindmillServiceStub();
-      assertSame(stub2, stub1);
-      assertThat(stub1.getChannel(), not(instanceOf(IsolationChannel.class)));
-
-      // Enable Isolated channels
-      
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ 
true));
-      CloudWindmillServiceV1Alpha1Stub stub3 = 
dispatcherClient.getWindmillServiceStub();
-      assertNotSame(stub3, stub1);
-
-      assertThat(stub3.getChannel(), instanceOf(IsolationChannel.class));
-      CloudWindmillServiceV1Alpha1Stub stub4 = 
dispatcherClient.getWindmillServiceStub();
-      assertSame(stub3, stub4);
-
-      // Disable Isolated channels
-      
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ 
false));
-      CloudWindmillServiceV1Alpha1Stub stub5 = 
dispatcherClient.getWindmillServiceStub();
-      assertNotSame(stub4, stub5);
-      assertThat(stub5.getChannel(), not(instanceOf(IsolationChannel.class)));
-    }
-  }
-
-  @RunWith(Parameterized.class)
-  public static class RespectsPipelineOptionsTest {
-
-    @Parameters
-    public static Collection<Object[]> data() {
-      List<Object[]> list = new ArrayList<>();
-      for (Boolean pipelineOption : new Boolean[] {true, false}) {
-        list.add(new Object[] {pipelineOption});
-      }
-      return list;
-    }
-
-    @Parameter(0)
-    public Boolean pipelineOption;
-
-    @Test
-    public void ignoresIsolatedChannelsConfigWithPipelineOption() {
-      DataflowWorkerHarnessOptions options =
-          PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
-      options.setUseWindmillIsolatedChannels(pipelineOption);
-      GrpcDispatcherClient dispatcherClient =
-          GrpcDispatcherClient.create(options, new 
WindmillStubFactoryFactoryImpl(options));
-      Matcher<Object> classMatcher =
-          pipelineOption
-              ? instanceOf(IsolationChannel.class)
-              : not(instanceOf(IsolationChannel.class));
-
-      // Job setting disabled, PipelineOption enabled
-      
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ 
false));
-      CloudWindmillServiceV1Alpha1Stub stub1 = 
dispatcherClient.getWindmillServiceStub();
-      CloudWindmillServiceV1Alpha1Stub stub2 = 
dispatcherClient.getWindmillServiceStub();
-      assertSame(stub2, stub1);
-      assertThat(stub1.getChannel(), classMatcher);
-
-      // Job setting enabled
-      
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ 
true));
-      CloudWindmillServiceV1Alpha1Stub stub3 = 
dispatcherClient.getWindmillServiceStub();
-      assertSame(stub3, stub1);
-
-      CloudWindmillServiceV1Alpha1Stub stub4 = 
dispatcherClient.getWindmillServiceStub();
-      assertSame(stub3, stub4);
-
-      // Job setting disabled
-      
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ 
false));
-      CloudWindmillServiceV1Alpha1Stub stub5 = 
dispatcherClient.getWindmillServiceStub();
-      assertSame(stub4, stub5);
-    }
-  }
-
-  static StreamingGlobalConfig getGlobalConfig(boolean 
useWindmillIsolatedChannels) {
-    return StreamingGlobalConfig.builder()
-        
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromString("windmill:1234")))
-        .setUserWorkerJobSettings(
-            UserWorkerRunnerV1Settings.newBuilder()
-                .setUseWindmillIsolatedChannels(useWindmillIsolatedChannels)
-                .build())
-        .build();
-  }
-}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java
index 51f8b8e1432..741b025e601 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java
@@ -29,7 +29,7 @@ public class FakeWindmillStubFactoryFactory implements 
WindmillStubFactoryFactor
   }
 
   @Override
-  public WindmillStubFactory makeWindmillStubFactory(boolean 
useIsolatedChannels) {
+  public WindmillStubFactory makeWindmillStubFactory() {
     return windmillStubFactory;
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
 
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
index a4b3df906dd..8c6cc08f4d5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
+++ 
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
@@ -966,9 +966,6 @@ enum ConnectivityType {
 
 // Settings to control runtime behavior of the java runner v1 user worker.
 message UserWorkerRunnerV1Settings {
-  // If true, use separate channels for each windmill RPC.
-  optional bool use_windmill_isolated_channels = 1 [default = true];
-
   // If true, use separate streaming RPC for windmill heartbeats and state 
reads.
   optional bool use_separate_windmill_heartbeat_streams = 2 [default = true];
 
@@ -976,6 +973,8 @@ message UserWorkerRunnerV1Settings {
 
   optional ConnectivityType connectivity_type = 4
     [default = CONNECTIVITY_TYPE_DEFAULT];
+
+  reserved 1;
 }
 
 service WindmillAppliance {

Reply via email to