This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e8c9143ca5 [Dataflow Streaming] Use isolated windmill streams based 
on job settings (#32503)
8e8c9143ca5 is described below

commit 8e8c9143ca512976e6063a4e2cd9b4041634dab0
Author: Arun Pandian <[email protected]>
AuthorDate: Thu Sep 26 04:02:30 2024 -0700

    [Dataflow Streaming] Use isolated windmill streams based on job settings 
(#32503)
---
 .../options/DataflowStreamingPipelineOptions.java  |   5 +-
 .../dataflow/worker/StreamingDataflowWorker.java   |  30 +---
 .../windmill/client/grpc/GrpcDispatcherClient.java |  66 +++++++--
 .../windmill/client/grpc/GrpcWindmillServer.java   |  16 ++-
 .../grpc/stubs/WindmillStubFactoryFactory.java     |  25 ++++
 .../grpc/stubs/WindmillStubFactoryFactoryImpl.java |  54 ++++++++
 .../FanOutStreamingEngineWorkerHarnessTest.java    |   9 +-
 .../client/grpc/GrpcDispatcherClientTest.java      | 154 +++++++++++++++++++++
 .../client/grpc/GrpcWindmillServerTest.java        |  11 +-
 .../testing/FakeWindmillStubFactoryFactory.java    |  35 +++++
 10 files changed, 354 insertions(+), 51 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
index 10df6e24f49..6a0208f1447 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
@@ -125,10 +125,9 @@ public interface DataflowStreamingPipelineOptions extends 
PipelineOptions {
   void setWindmillMessagesBetweenIsReadyChecks(int value);
 
   @Description("If true, a most a single active rpc will be used per channel.")
-  @Default.Boolean(false)
-  boolean getUseWindmillIsolatedChannels();
+  Boolean getUseWindmillIsolatedChannels();
 
-  void setUseWindmillIsolatedChannels(boolean value);
+  void setUseWindmillIsolatedChannels(Boolean value);
 
   @Description(
       "If true, separate streaming rpcs will be used for heartbeats instead of 
sharing streams with state reads.")
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 8b440c306f0..ecdba404151 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.runners.dataflow.worker;
 
-import static 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel;
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
 
 import com.google.api.services.dataflow.model.CounterUpdate;
@@ -63,7 +62,6 @@ import 
org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
-import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
 import 
org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
@@ -79,10 +77,7 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServ
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcDispatcherClient;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl;
 import 
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker;
@@ -100,7 +95,6 @@ import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.util.construction.CoderTranslation;
-import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
@@ -449,7 +443,7 @@ public final class StreamingDataflowWorker {
     GrpcWindmillStreamFactory windmillStreamFactory;
     if (options.isEnableStreamingEngine()) {
       GrpcDispatcherClient dispatcherClient =
-          GrpcDispatcherClient.create(createStubFactory(options));
+          GrpcDispatcherClient.create(options, new 
WindmillStubFactoryFactoryImpl(options));
       configFetcher =
           StreamingEngineComputationConfigFetcher.create(
               options.getGlobalConfigRefreshPeriod().getMillis(), 
dataflowServiceClient);
@@ -475,7 +469,7 @@ public final class StreamingDataflowWorker {
             GrpcWindmillServer.create(
                 options,
                 windmillStreamFactory,
-                GrpcDispatcherClient.create(createStubFactory(options)));
+                GrpcDispatcherClient.create(options, new 
WindmillStubFactoryFactoryImpl(options)));
       } else {
         windmillStreamFactory = windmillStreamFactoryBuilder.build();
         windmillServer = new 
JniWindmillApplianceServer(options.getLocalWindmillHostport());
@@ -679,24 +673,6 @@ public final class StreamingDataflowWorker {
     worker.start();
   }
 
-  private static ChannelCachingStubFactory createStubFactory(
-      DataflowWorkerHarnessOptions workerOptions) {
-    Function<WindmillServiceAddress, ManagedChannel> channelFactory =
-        serviceAddress ->
-            remoteChannel(
-                serviceAddress, 
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec());
-    ChannelCache channelCache =
-        ChannelCache.create(
-            serviceAddress ->
-                // IsolationChannel will create and manage separate RPC 
channels to the same
-                // serviceAddress via calling the channelFactory, else just 
directly return the
-                // RPC channel.
-                workerOptions.getUseWindmillIsolatedChannels()
-                    ? IsolationChannel.create(() -> 
channelFactory.apply(serviceAddress))
-                    : channelFactory.apply(serviceAddress));
-    return 
ChannelCachingRemoteStubFactory.create(workerOptions.getGcpCredential(), 
channelCache);
-  }
-
   private static int chooseMaxThreads(DataflowWorkerHarnessOptions options) {
     if (options.getNumberOfWorkerHarnessThreads() != 0) {
       return options.getNumberOfWorkerHarnessThreads();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
index 412608ea398..f96464150d4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
@@ -26,9 +26,12 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc.CloudWindmillMetadataServiceV1Alpha1Stub;
@@ -36,6 +39,7 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Al
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
 import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -49,7 +53,8 @@ import org.slf4j.LoggerFactory;
 public class GrpcDispatcherClient {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcDispatcherClient.class);
-  private final WindmillStubFactory windmillStubFactory;
+  static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS =
+      "streaming_engine_use_job_settings_for_isolated_channels";
   private final CountDownLatch onInitializedEndpoints;
 
   /**
@@ -62,23 +67,49 @@ public class GrpcDispatcherClient {
   @GuardedBy("this")
   private final Random rand;
 
+  private final WindmillStubFactoryFactory windmillStubFactoryFactory;
+
+  private final AtomicReference<WindmillStubFactory> windmillStubFactory = new 
AtomicReference<>();
+
+  private final AtomicBoolean useIsolatedChannels = new AtomicBoolean();
+  private final boolean reactToIsolatedChannelsJobSetting;
+
   private GrpcDispatcherClient(
-      WindmillStubFactory windmillStubFactory,
+      DataflowWorkerHarnessOptions options,
+      WindmillStubFactoryFactory windmillStubFactoryFactory,
       DispatcherStubs initialDispatcherStubs,
       Random rand) {
-    this.windmillStubFactory = windmillStubFactory;
+    this.windmillStubFactoryFactory = windmillStubFactoryFactory;
+    if (DataflowRunner.hasExperiment(
+        options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS)) {
+      if (options.getUseWindmillIsolatedChannels() != null) {
+        this.useIsolatedChannels.set(options.getUseWindmillIsolatedChannels());
+        this.reactToIsolatedChannelsJobSetting = false;
+      } else {
+        this.useIsolatedChannels.set(false);
+        this.reactToIsolatedChannelsJobSetting = true;
+      }
+    } else {
+      
this.useIsolatedChannels.set(Boolean.TRUE.equals(options.getUseWindmillIsolatedChannels()));
+      this.reactToIsolatedChannelsJobSetting = false;
+    }
+    this.windmillStubFactory.set(
+        
windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels.get()));
     this.rand = rand;
     this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
     this.onInitializedEndpoints = new CountDownLatch(1);
   }
 
-  public static GrpcDispatcherClient create(WindmillStubFactory 
windmillStubFactory) {
-    return new GrpcDispatcherClient(windmillStubFactory, 
DispatcherStubs.empty(), new Random());
+  public static GrpcDispatcherClient create(
+      DataflowWorkerHarnessOptions options, WindmillStubFactoryFactory 
windmillStubFactoryFactory) {
+    return new GrpcDispatcherClient(
+        options, windmillStubFactoryFactory, DispatcherStubs.empty(), new 
Random());
   }
 
   @VisibleForTesting
   public static GrpcDispatcherClient forTesting(
-      WindmillStubFactory windmillGrpcStubFactory,
+      DataflowWorkerHarnessOptions options,
+      WindmillStubFactoryFactory windmillStubFactoryFactory,
       List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
       List<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs,
       Set<HostAndPort> dispatcherEndpoints) {
@@ -86,7 +117,8 @@ public class GrpcDispatcherClient {
         dispatcherEndpoints.size() == windmillServiceStubs.size()
             && windmillServiceStubs.size() == 
windmillMetadataServiceStubs.size());
     return new GrpcDispatcherClient(
-        windmillGrpcStubFactory,
+        options,
+        windmillStubFactoryFactory,
         DispatcherStubs.create(
             dispatcherEndpoints, windmillServiceStubs, 
windmillMetadataServiceStubs),
         new Random());
@@ -153,17 +185,31 @@ public class GrpcDispatcherClient {
       LOG.warn("Dispatcher client received empty windmill service endpoints 
from global config");
       return;
     }
-    consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints());
+    boolean forceRecreateStubs = false;
+    if (reactToIsolatedChannelsJobSetting) {
+      boolean useIsolatedChannels = 
config.userWorkerJobSettings().getUseWindmillIsolatedChannels();
+      if (this.useIsolatedChannels.getAndSet(useIsolatedChannels) != 
useIsolatedChannels) {
+        windmillStubFactory.set(
+            
windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels));
+        forceRecreateStubs = true;
+      }
+    }
+    consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints(), 
forceRecreateStubs);
   }
 
   public synchronized void consumeWindmillDispatcherEndpoints(
       ImmutableSet<HostAndPort> dispatcherEndpoints) {
+    consumeWindmillDispatcherEndpoints(dispatcherEndpoints, 
/*forceRecreateStubs=*/ false);
+  }
+
+  private synchronized void consumeWindmillDispatcherEndpoints(
+      ImmutableSet<HostAndPort> dispatcherEndpoints, boolean 
forceRecreateStubs) {
     ImmutableSet<HostAndPort> currentDispatcherEndpoints =
         dispatcherStubs.get().dispatcherEndpoints();
     Preconditions.checkArgument(
         dispatcherEndpoints != null && !dispatcherEndpoints.isEmpty(),
         "Cannot set dispatcher endpoints to nothing.");
-    if (currentDispatcherEndpoints.equals(dispatcherEndpoints)) {
+    if (!forceRecreateStubs && 
currentDispatcherEndpoints.equals(dispatcherEndpoints)) {
       // The endpoints are equal don't recreate the stubs.
       return;
     }
@@ -174,7 +220,7 @@ public class GrpcDispatcherClient {
     }
 
     LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", 
dispatcherEndpoints);
-    dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints, 
windmillStubFactory));
+    dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints, 
windmillStubFactory.get()));
     onInitializedEndpoints.countDown();
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
index 1fce4d238b2..31049598267 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
@@ -53,7 +53,7 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
-import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.StreamingEngineThrottleTimers;
 import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
@@ -154,7 +154,7 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
       String name,
       List<String> experiments,
       long clientId,
-      WindmillStubFactory windmillStubFactory) {
+      WindmillStubFactoryFactory windmillStubFactoryFactory) {
     ManagedChannel inProcessChannel = inProcessChannel(name);
     CloudWindmillServiceV1Alpha1Stub stub =
         CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel);
@@ -164,16 +164,18 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
     List<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs =
         Lists.newArrayList(metadataStub);
 
+    DataflowWorkerHarnessOptions testOptions =
+        testOptions(/* enableStreamingEngine= */ true, experiments);
+
     Set<HostAndPort> dispatcherEndpoints = 
Sets.newHashSet(HostAndPort.fromHost(name));
     GrpcDispatcherClient dispatcherClient =
         GrpcDispatcherClient.forTesting(
-            windmillStubFactory,
+            testOptions,
+            windmillStubFactoryFactory,
             windmillServiceStubs,
             windmillMetadataServiceStubs,
             dispatcherEndpoints);
 
-    DataflowWorkerHarnessOptions testOptions =
-        testOptions(/* enableStreamingEngine= */ true, experiments);
     boolean sendKeyedGetDataRequests =
         !testOptions.isEnableStreamingEngine()
             || DataflowRunner.hasExperiment(
@@ -190,7 +192,7 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
 
   @VisibleForTesting
   static GrpcWindmillServer newApplianceTestInstance(
-      Channel channel, WindmillStubFactory windmillStubFactory) {
+      Channel channel, WindmillStubFactoryFactory windmillStubFactoryFactory) {
     DataflowWorkerHarnessOptions options =
         testOptions(/* enableStreamingEngine= */ false, new ArrayList<>());
     GrpcWindmillServer testServer =
@@ -198,7 +200,7 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
             options,
             GrpcWindmillStreamFactory.of(createJobHeader(options, 1)).build(),
             // No-op, Appliance does not use Dispatcher to call Streaming 
Engine.
-            GrpcDispatcherClient.create(windmillStubFactory));
+            GrpcDispatcherClient.create(options, windmillStubFactoryFactory));
     testServer.syncApplianceStub = 
createWindmillApplianceStubWithDeadlineInterceptor(channel);
     return testServer;
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java
new file mode 100644
index 00000000000..f7dd9a22b99
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import org.apache.beam.sdk.annotations.Internal;
+
+@Internal
+public interface WindmillStubFactoryFactory {
+  WindmillStubFactory makeWindmillStubFactory(boolean useIsolatedChannels);
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java
new file mode 100644
index 00000000000..f6ffb9c1451
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import static 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel;
+
+import com.google.auth.Credentials;
+import java.util.function.Function;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+
+public class WindmillStubFactoryFactoryImpl implements 
WindmillStubFactoryFactory {
+
+  private final int windmillServiceRpcChannelAliveTimeoutSec;
+  private final Credentials gcpCredential;
+
+  public WindmillStubFactoryFactoryImpl(DataflowWorkerHarnessOptions 
workerOptions) {
+    this.gcpCredential = workerOptions.getGcpCredential();
+    this.windmillServiceRpcChannelAliveTimeoutSec =
+        workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec();
+  }
+
+  @Override
+  public WindmillStubFactory makeWindmillStubFactory(boolean 
useIsolatedChannels) {
+    Function<WindmillServiceAddress, ManagedChannel> channelFactory =
+        serviceAddress -> remoteChannel(serviceAddress, 
windmillServiceRpcChannelAliveTimeoutSec);
+    ChannelCache channelCache =
+        ChannelCache.create(
+            serviceAddress ->
+                // IsolationChannel will create and manage separate RPC 
channels to the same
+                // serviceAddress via calling the channelFactory, else just 
directly return the
+                // RPC channel.
+                useIsolatedChannels
+                    ? IsolationChannel.create(() -> 
channelFactory.apply(serviceAddress))
+                    : channelFactory.apply(serviceAddress));
+    return ChannelCachingRemoteStubFactory.create(gcpCredential, channelCache);
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
index aaa71b6598e..ed8815c48e7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
@@ -39,6 +39,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
 import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
 import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
@@ -54,10 +55,12 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmill
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory;
 import 
org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactoryFactory;
 import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemScheduler;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor;
 import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetSpender;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server;
 import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder;
 import 
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessSocketAddress;
@@ -111,7 +114,11 @@ public class FanOutStreamingEngineWorkerHarnessTest {
                   
WindmillChannelFactory.inProcessChannel("StreamingEngineClientTest")));
   private final GrpcDispatcherClient dispatcherClient =
       GrpcDispatcherClient.forTesting(
-          stubFactory, new ArrayList<>(), new ArrayList<>(), new HashSet<>());
+          PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class),
+          new FakeWindmillStubFactoryFactory(stubFactory),
+          new ArrayList<>(),
+          new ArrayList<>(),
+          new HashSet<>());
   @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private Server fakeStreamingEngineServer;
   private CountDownLatch getWorkerMetadataReady;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
new file mode 100644
index 00000000000..3f746d91a86
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
+import 
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Enclosed.class)
+public class GrpcDispatcherClientTest {
+
+  @RunWith(JUnit4.class)
+  public static class RespectsJobSettingTest {
+
+    @Test
+    public void createsNewStubWhenIsolatedChannelsConfigIsChanged() {
+      DataflowWorkerHarnessOptions options =
+          PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
+      options.setExperiments(
+          Lists.newArrayList(
+              
GrpcDispatcherClient.STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS));
+      GrpcDispatcherClient dispatcherClient =
+          GrpcDispatcherClient.create(options, new 
WindmillStubFactoryFactoryImpl(options));
+      // Create first time with Isolated channels disabled
+      
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ 
false));
+      CloudWindmillServiceV1Alpha1Stub stub1 = 
dispatcherClient.getWindmillServiceStub();
+      CloudWindmillServiceV1Alpha1Stub stub2 = 
dispatcherClient.getWindmillServiceStub();
+      assertSame(stub2, stub1);
+      assertThat(stub1.getChannel(), not(instanceOf(IsolationChannel.class)));
+
+      // Enable Isolated channels
+      
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ 
true));
+      CloudWindmillServiceV1Alpha1Stub stub3 = 
dispatcherClient.getWindmillServiceStub();
+      assertNotSame(stub3, stub1);
+
+      assertThat(stub3.getChannel(), instanceOf(IsolationChannel.class));
+      CloudWindmillServiceV1Alpha1Stub stub4 = 
dispatcherClient.getWindmillServiceStub();
+      assertSame(stub3, stub4);
+
+      // Disable Isolated channels
+      
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ 
false));
+      CloudWindmillServiceV1Alpha1Stub stub5 = 
dispatcherClient.getWindmillServiceStub();
+      assertNotSame(stub4, stub5);
+      assertThat(stub5.getChannel(), not(instanceOf(IsolationChannel.class)));
+    }
+  }
+
+  @RunWith(Parameterized.class)
+  public static class RespectsPipelineOptionsTest {
+
+    @Parameters
+    public static Collection<Object[]> data() {
+      List<Object[]> list = new ArrayList<>();
+      for (Boolean pipelineOption : new Boolean[] {true, false}) {
+        list.add(new Object[] {/*experimentEnabled=*/ false, pipelineOption});
+        list.add(new Object[] {/*experimentEnabled=*/ true, pipelineOption});
+      }
+      return list;
+    }
+
+    @Parameter(0)
+    public Boolean experimentEnabled;
+
+    @Parameter(1)
+    public Boolean pipelineOption;
+
+    @Test
+    public void ignoresIsolatedChannelsConfigWithPipelineOption() {
+      DataflowWorkerHarnessOptions options =
+          PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
+      if (experimentEnabled) {
+        options.setExperiments(
+            Lists.newArrayList(
+                
GrpcDispatcherClient.STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS));
+      }
+      options.setUseWindmillIsolatedChannels(pipelineOption);
+      GrpcDispatcherClient dispatcherClient =
+          GrpcDispatcherClient.create(options, new 
WindmillStubFactoryFactoryImpl(options));
+      Matcher<Object> classMatcher =
+          pipelineOption
+              ? instanceOf(IsolationChannel.class)
+              : not(instanceOf(IsolationChannel.class));
+
+      // Job setting disabled, PipelineOption enabled
+      
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ 
false));
+      CloudWindmillServiceV1Alpha1Stub stub1 = 
dispatcherClient.getWindmillServiceStub();
+      CloudWindmillServiceV1Alpha1Stub stub2 = 
dispatcherClient.getWindmillServiceStub();
+      assertSame(stub2, stub1);
+      assertThat(stub1.getChannel(), classMatcher);
+
+      // Job setting enabled
+      
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ 
true));
+      CloudWindmillServiceV1Alpha1Stub stub3 = 
dispatcherClient.getWindmillServiceStub();
+      assertSame(stub3, stub1);
+
+      CloudWindmillServiceV1Alpha1Stub stub4 = 
dispatcherClient.getWindmillServiceStub();
+      assertSame(stub3, stub4);
+
+      // Job setting disabled
+      
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ 
false));
+      CloudWindmillServiceV1Alpha1Stub stub5 = 
dispatcherClient.getWindmillServiceStub();
+      assertSame(stub4, stub5);
+    }
+  }
+
+  static StreamingGlobalConfig getGlobalConfig(boolean 
useWindmillIsolatedChannels) {
+    return StreamingGlobalConfig.builder()
+        
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromString("windmill:1234")))
+        .setUserWorkerJobSettings(
+            UserWorkerRunnerV1Settings.newBuilder()
+                .setUseWindmillIsolatedChannels(useWindmillIsolatedChannels)
+                .build())
+        .build();
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
index 7e5801b65de..239e3979a3b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
@@ -73,6 +73,7 @@ import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.Ge
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
 import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory;
 import 
org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactoryFactory;
 import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.CallOptions;
 import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Channel;
@@ -110,6 +111,7 @@ import org.slf4j.LoggerFactory;
   "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
 })
 public class GrpcWindmillServerTest {
+
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcWindmillServerTest.class);
   private static final int STREAM_CHUNK_SIZE = 2 << 20;
   private final long clientId = 10L;
@@ -145,8 +147,9 @@ public class GrpcWindmillServerTest {
             name,
             experiments,
             clientId,
-            new FakeWindmillStubFactory(
-                () -> 
grpcCleanup.register(WindmillChannelFactory.inProcessChannel(name))));
+            new FakeWindmillStubFactoryFactory(
+                new FakeWindmillStubFactory(
+                    () -> 
grpcCleanup.register(WindmillChannelFactory.inProcessChannel(name)))));
   }
 
   private <Stream extends StreamObserver> void maybeInjectError(Stream stream) 
{
@@ -212,7 +215,9 @@ public class GrpcWindmillServerTest {
 
     this.client =
         GrpcWindmillServer.newApplianceTestInstance(
-            inprocessChannel, new FakeWindmillStubFactory(() -> 
(ManagedChannel) inprocessChannel));
+            inprocessChannel,
+            new FakeWindmillStubFactoryFactory(
+                new FakeWindmillStubFactory(() -> (ManagedChannel) 
inprocessChannel)));
 
     Windmill.GetWorkResponse response1 = 
client.getWork(GetWorkRequest.getDefaultInstance());
     Windmill.GetWorkResponse response2 = 
client.getWork(GetWorkRequest.getDefaultInstance());
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java
new file mode 100644
index 00000000000..51f8b8e1432
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.testing;
+
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory;
+
+public class FakeWindmillStubFactoryFactory implements 
WindmillStubFactoryFactory {
+
+  private final WindmillStubFactory windmillStubFactory;
+
+  public FakeWindmillStubFactoryFactory(WindmillStubFactory 
windmillStubFactory) {
+    this.windmillStubFactory = windmillStubFactory;
+  }
+
+  @Override
+  public WindmillStubFactory makeWindmillStubFactory(boolean 
useIsolatedChannels) {
+    return windmillStubFactory;
+  }
+}


Reply via email to