This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8e8c9143ca5 [Dataflow Streaming] Use isolated windmill streams based
on job settings (#32503)
8e8c9143ca5 is described below
commit 8e8c9143ca512976e6063a4e2cd9b4041634dab0
Author: Arun Pandian <[email protected]>
AuthorDate: Thu Sep 26 04:02:30 2024 -0700
[Dataflow Streaming] Use isolated windmill streams based on job settings
(#32503)
---
.../options/DataflowStreamingPipelineOptions.java | 5 +-
.../dataflow/worker/StreamingDataflowWorker.java | 30 +---
.../windmill/client/grpc/GrpcDispatcherClient.java | 66 +++++++--
.../windmill/client/grpc/GrpcWindmillServer.java | 16 ++-
.../grpc/stubs/WindmillStubFactoryFactory.java | 25 ++++
.../grpc/stubs/WindmillStubFactoryFactoryImpl.java | 54 ++++++++
.../FanOutStreamingEngineWorkerHarnessTest.java | 9 +-
.../client/grpc/GrpcDispatcherClientTest.java | 154 +++++++++++++++++++++
.../client/grpc/GrpcWindmillServerTest.java | 11 +-
.../testing/FakeWindmillStubFactoryFactory.java | 35 +++++
10 files changed, 354 insertions(+), 51 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
index 10df6e24f49..6a0208f1447 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
@@ -125,10 +125,9 @@ public interface DataflowStreamingPipelineOptions extends
PipelineOptions {
void setWindmillMessagesBetweenIsReadyChecks(int value);
@Description("If true, a most a single active rpc will be used per channel.")
- @Default.Boolean(false)
- boolean getUseWindmillIsolatedChannels();
+ Boolean getUseWindmillIsolatedChannels();
- void setUseWindmillIsolatedChannels(boolean value);
+ void setUseWindmillIsolatedChannels(Boolean value);
@Description(
"If true, separate streaming rpcs will be used for heartbeats instead of
sharing streams with state reads.")
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 8b440c306f0..ecdba404151 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.dataflow.worker;
-import static
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel;
import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import com.google.api.services.dataflow.model.CounterUpdate;
@@ -63,7 +62,6 @@ import
org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
-import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
import
org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer;
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
@@ -79,10 +77,7 @@ import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.ChannelzServ
import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcDispatcherClient;
import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillServer;
import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
-import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache;
-import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory;
-import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory;
-import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl;
import
org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
import
org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler;
import
org.apache.beam.runners.dataflow.worker.windmill.work.processing.failures.FailureTracker;
@@ -100,7 +95,6 @@ import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySinkMetrics;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.construction.CoderTranslation;
-import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
@@ -449,7 +443,7 @@ public final class StreamingDataflowWorker {
GrpcWindmillStreamFactory windmillStreamFactory;
if (options.isEnableStreamingEngine()) {
GrpcDispatcherClient dispatcherClient =
- GrpcDispatcherClient.create(createStubFactory(options));
+ GrpcDispatcherClient.create(options, new
WindmillStubFactoryFactoryImpl(options));
configFetcher =
StreamingEngineComputationConfigFetcher.create(
options.getGlobalConfigRefreshPeriod().getMillis(),
dataflowServiceClient);
@@ -475,7 +469,7 @@ public final class StreamingDataflowWorker {
GrpcWindmillServer.create(
options,
windmillStreamFactory,
- GrpcDispatcherClient.create(createStubFactory(options)));
+ GrpcDispatcherClient.create(options, new
WindmillStubFactoryFactoryImpl(options)));
} else {
windmillStreamFactory = windmillStreamFactoryBuilder.build();
windmillServer = new
JniWindmillApplianceServer(options.getLocalWindmillHostport());
@@ -679,24 +673,6 @@ public final class StreamingDataflowWorker {
worker.start();
}
- private static ChannelCachingStubFactory createStubFactory(
- DataflowWorkerHarnessOptions workerOptions) {
- Function<WindmillServiceAddress, ManagedChannel> channelFactory =
- serviceAddress ->
- remoteChannel(
- serviceAddress,
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec());
- ChannelCache channelCache =
- ChannelCache.create(
- serviceAddress ->
- // IsolationChannel will create and manage separate RPC
channels to the same
- // serviceAddress via calling the channelFactory, else just
directly return the
- // RPC channel.
- workerOptions.getUseWindmillIsolatedChannels()
- ? IsolationChannel.create(() ->
channelFactory.apply(serviceAddress))
- : channelFactory.apply(serviceAddress));
- return
ChannelCachingRemoteStubFactory.create(workerOptions.getGcpCredential(),
channelCache);
- }
-
private static int chooseMaxThreads(DataflowWorkerHarnessOptions options) {
if (options.getNumberOfWorkerHarnessThreads() != 0) {
return options.getNumberOfWorkerHarnessThreads();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
index 412608ea398..f96464150d4 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
@@ -26,9 +26,12 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc.CloudWindmillMetadataServiceV1Alpha1Stub;
@@ -36,6 +39,7 @@ import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Al
import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
@@ -49,7 +53,8 @@ import org.slf4j.LoggerFactory;
public class GrpcDispatcherClient {
private static final Logger LOG =
LoggerFactory.getLogger(GrpcDispatcherClient.class);
- private final WindmillStubFactory windmillStubFactory;
+ static final String STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS =
+ "streaming_engine_use_job_settings_for_isolated_channels";
private final CountDownLatch onInitializedEndpoints;
/**
@@ -62,23 +67,49 @@ public class GrpcDispatcherClient {
@GuardedBy("this")
private final Random rand;
+ private final WindmillStubFactoryFactory windmillStubFactoryFactory;
+
+ private final AtomicReference<WindmillStubFactory> windmillStubFactory = new
AtomicReference<>();
+
+ private final AtomicBoolean useIsolatedChannels = new AtomicBoolean();
+ private final boolean reactToIsolatedChannelsJobSetting;
+
private GrpcDispatcherClient(
- WindmillStubFactory windmillStubFactory,
+ DataflowWorkerHarnessOptions options,
+ WindmillStubFactoryFactory windmillStubFactoryFactory,
DispatcherStubs initialDispatcherStubs,
Random rand) {
- this.windmillStubFactory = windmillStubFactory;
+ this.windmillStubFactoryFactory = windmillStubFactoryFactory;
+ if (DataflowRunner.hasExperiment(
+ options, STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS)) {
+ if (options.getUseWindmillIsolatedChannels() != null) {
+ this.useIsolatedChannels.set(options.getUseWindmillIsolatedChannels());
+ this.reactToIsolatedChannelsJobSetting = false;
+ } else {
+ this.useIsolatedChannels.set(false);
+ this.reactToIsolatedChannelsJobSetting = true;
+ }
+ } else {
+
this.useIsolatedChannels.set(Boolean.TRUE.equals(options.getUseWindmillIsolatedChannels()));
+ this.reactToIsolatedChannelsJobSetting = false;
+ }
+ this.windmillStubFactory.set(
+
windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels.get()));
this.rand = rand;
this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
this.onInitializedEndpoints = new CountDownLatch(1);
}
- public static GrpcDispatcherClient create(WindmillStubFactory
windmillStubFactory) {
- return new GrpcDispatcherClient(windmillStubFactory,
DispatcherStubs.empty(), new Random());
+ public static GrpcDispatcherClient create(
+ DataflowWorkerHarnessOptions options, WindmillStubFactoryFactory
windmillStubFactoryFactory) {
+ return new GrpcDispatcherClient(
+ options, windmillStubFactoryFactory, DispatcherStubs.empty(), new
Random());
}
@VisibleForTesting
public static GrpcDispatcherClient forTesting(
- WindmillStubFactory windmillGrpcStubFactory,
+ DataflowWorkerHarnessOptions options,
+ WindmillStubFactoryFactory windmillStubFactoryFactory,
List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
List<CloudWindmillMetadataServiceV1Alpha1Stub>
windmillMetadataServiceStubs,
Set<HostAndPort> dispatcherEndpoints) {
@@ -86,7 +117,8 @@ public class GrpcDispatcherClient {
dispatcherEndpoints.size() == windmillServiceStubs.size()
&& windmillServiceStubs.size() ==
windmillMetadataServiceStubs.size());
return new GrpcDispatcherClient(
- windmillGrpcStubFactory,
+ options,
+ windmillStubFactoryFactory,
DispatcherStubs.create(
dispatcherEndpoints, windmillServiceStubs,
windmillMetadataServiceStubs),
new Random());
@@ -153,17 +185,31 @@ public class GrpcDispatcherClient {
LOG.warn("Dispatcher client received empty windmill service endpoints
from global config");
return;
}
- consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints());
+ boolean forceRecreateStubs = false;
+ if (reactToIsolatedChannelsJobSetting) {
+ boolean useIsolatedChannels =
config.userWorkerJobSettings().getUseWindmillIsolatedChannels();
+ if (this.useIsolatedChannels.getAndSet(useIsolatedChannels) !=
useIsolatedChannels) {
+ windmillStubFactory.set(
+
windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels));
+ forceRecreateStubs = true;
+ }
+ }
+ consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints(),
forceRecreateStubs);
}
public synchronized void consumeWindmillDispatcherEndpoints(
ImmutableSet<HostAndPort> dispatcherEndpoints) {
+ consumeWindmillDispatcherEndpoints(dispatcherEndpoints,
/*forceRecreateStubs=*/ false);
+ }
+
+ private synchronized void consumeWindmillDispatcherEndpoints(
+ ImmutableSet<HostAndPort> dispatcherEndpoints, boolean
forceRecreateStubs) {
ImmutableSet<HostAndPort> currentDispatcherEndpoints =
dispatcherStubs.get().dispatcherEndpoints();
Preconditions.checkArgument(
dispatcherEndpoints != null && !dispatcherEndpoints.isEmpty(),
"Cannot set dispatcher endpoints to nothing.");
- if (currentDispatcherEndpoints.equals(dispatcherEndpoints)) {
+ if (!forceRecreateStubs &&
currentDispatcherEndpoints.equals(dispatcherEndpoints)) {
// The endpoints are equal don't recreate the stubs.
return;
}
@@ -174,7 +220,7 @@ public class GrpcDispatcherClient {
}
LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}",
dispatcherEndpoints);
- dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints,
windmillStubFactory));
+ dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints,
windmillStubFactory.get()));
onInitializedEndpoints.countDown();
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
index 1fce4d238b2..31049598267 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
@@ -53,7 +53,7 @@ import
org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream;
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
-import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory;
import
org.apache.beam.runners.dataflow.worker.windmill.client.throttling.StreamingEngineThrottleTimers;
import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
@@ -154,7 +154,7 @@ public final class GrpcWindmillServer extends
WindmillServerStub {
String name,
List<String> experiments,
long clientId,
- WindmillStubFactory windmillStubFactory) {
+ WindmillStubFactoryFactory windmillStubFactoryFactory) {
ManagedChannel inProcessChannel = inProcessChannel(name);
CloudWindmillServiceV1Alpha1Stub stub =
CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel);
@@ -164,16 +164,18 @@ public final class GrpcWindmillServer extends
WindmillServerStub {
List<CloudWindmillMetadataServiceV1Alpha1Stub>
windmillMetadataServiceStubs =
Lists.newArrayList(metadataStub);
+ DataflowWorkerHarnessOptions testOptions =
+ testOptions(/* enableStreamingEngine= */ true, experiments);
+
Set<HostAndPort> dispatcherEndpoints =
Sets.newHashSet(HostAndPort.fromHost(name));
GrpcDispatcherClient dispatcherClient =
GrpcDispatcherClient.forTesting(
- windmillStubFactory,
+ testOptions,
+ windmillStubFactoryFactory,
windmillServiceStubs,
windmillMetadataServiceStubs,
dispatcherEndpoints);
- DataflowWorkerHarnessOptions testOptions =
- testOptions(/* enableStreamingEngine= */ true, experiments);
boolean sendKeyedGetDataRequests =
!testOptions.isEnableStreamingEngine()
|| DataflowRunner.hasExperiment(
@@ -190,7 +192,7 @@ public final class GrpcWindmillServer extends
WindmillServerStub {
@VisibleForTesting
static GrpcWindmillServer newApplianceTestInstance(
- Channel channel, WindmillStubFactory windmillStubFactory) {
+ Channel channel, WindmillStubFactoryFactory windmillStubFactoryFactory) {
DataflowWorkerHarnessOptions options =
testOptions(/* enableStreamingEngine= */ false, new ArrayList<>());
GrpcWindmillServer testServer =
@@ -198,7 +200,7 @@ public final class GrpcWindmillServer extends
WindmillServerStub {
options,
GrpcWindmillStreamFactory.of(createJobHeader(options, 1)).build(),
// No-op, Appliance does not use Dispatcher to call Streaming
Engine.
- GrpcDispatcherClient.create(windmillStubFactory));
+ GrpcDispatcherClient.create(options, windmillStubFactoryFactory));
testServer.syncApplianceStub =
createWindmillApplianceStubWithDeadlineInterceptor(channel);
return testServer;
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java
new file mode 100644
index 00000000000..f7dd9a22b99
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import org.apache.beam.sdk.annotations.Internal;
+
+@Internal
+public interface WindmillStubFactoryFactory {
+ WindmillStubFactory makeWindmillStubFactory(boolean useIsolatedChannels);
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java
new file mode 100644
index 00000000000..f6ffb9c1451
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import static
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory.remoteChannel;
+
+import com.google.auth.Credentials;
+import java.util.function.Function;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+
+public class WindmillStubFactoryFactoryImpl implements
WindmillStubFactoryFactory {
+
+ private final int windmillServiceRpcChannelAliveTimeoutSec;
+ private final Credentials gcpCredential;
+
+ public WindmillStubFactoryFactoryImpl(DataflowWorkerHarnessOptions
workerOptions) {
+ this.gcpCredential = workerOptions.getGcpCredential();
+ this.windmillServiceRpcChannelAliveTimeoutSec =
+ workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec();
+ }
+
+ @Override
+ public WindmillStubFactory makeWindmillStubFactory(boolean
useIsolatedChannels) {
+ Function<WindmillServiceAddress, ManagedChannel> channelFactory =
+ serviceAddress -> remoteChannel(serviceAddress,
windmillServiceRpcChannelAliveTimeoutSec);
+ ChannelCache channelCache =
+ ChannelCache.create(
+ serviceAddress ->
+ // IsolationChannel will create and manage separate RPC
channels to the same
+ // serviceAddress via calling the channelFactory, else just
directly return the
+ // RPC channel.
+ useIsolatedChannels
+ ? IsolationChannel.create(() ->
channelFactory.apply(serviceAddress))
+ : channelFactory.apply(serviceAddress));
+ return ChannelCachingRemoteStubFactory.create(gcpCredential, channelCache);
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
index aaa71b6598e..ed8815c48e7 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
@@ -39,6 +39,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
@@ -54,10 +55,12 @@ import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmill
import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory;
import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory;
import
org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactoryFactory;
import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemScheduler;
import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor;
import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetSpender;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Server;
import
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessServerBuilder;
import
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.inprocess.InProcessSocketAddress;
@@ -111,7 +114,11 @@ public class FanOutStreamingEngineWorkerHarnessTest {
WindmillChannelFactory.inProcessChannel("StreamingEngineClientTest")));
private final GrpcDispatcherClient dispatcherClient =
GrpcDispatcherClient.forTesting(
- stubFactory, new ArrayList<>(), new ArrayList<>(), new HashSet<>());
+ PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class),
+ new FakeWindmillStubFactoryFactory(stubFactory),
+ new ArrayList<>(),
+ new ArrayList<>(),
+ new HashSet<>());
@Rule public transient Timeout globalTimeout = Timeout.seconds(600);
private Server fakeStreamingEngineServer;
private CountDownLatch getWorkerMetadataReady;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
new file mode 100644
index 00000000000..3f746d91a86
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
+import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
+import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import org.hamcrest.Matcher;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Enclosed.class)
+public class GrpcDispatcherClientTest {
+
+ @RunWith(JUnit4.class)
+ public static class RespectsJobSettingTest {
+
+ @Test
+ public void createsNewStubWhenIsolatedChannelsConfigIsChanged() {
+ DataflowWorkerHarnessOptions options =
+ PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
+ options.setExperiments(
+ Lists.newArrayList(
+
GrpcDispatcherClient.STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS));
+ GrpcDispatcherClient dispatcherClient =
+ GrpcDispatcherClient.create(options, new
WindmillStubFactoryFactoryImpl(options));
+ // Create first time with Isolated channels disabled
+
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/
false));
+ CloudWindmillServiceV1Alpha1Stub stub1 =
dispatcherClient.getWindmillServiceStub();
+ CloudWindmillServiceV1Alpha1Stub stub2 =
dispatcherClient.getWindmillServiceStub();
+ assertSame(stub2, stub1);
+ assertThat(stub1.getChannel(), not(instanceOf(IsolationChannel.class)));
+
+ // Enable Isolated channels
+
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/
true));
+ CloudWindmillServiceV1Alpha1Stub stub3 =
dispatcherClient.getWindmillServiceStub();
+ assertNotSame(stub3, stub1);
+
+ assertThat(stub3.getChannel(), instanceOf(IsolationChannel.class));
+ CloudWindmillServiceV1Alpha1Stub stub4 =
dispatcherClient.getWindmillServiceStub();
+ assertSame(stub3, stub4);
+
+ // Disable Isolated channels
+
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/
false));
+ CloudWindmillServiceV1Alpha1Stub stub5 =
dispatcherClient.getWindmillServiceStub();
+ assertNotSame(stub4, stub5);
+ assertThat(stub5.getChannel(), not(instanceOf(IsolationChannel.class)));
+ }
+ }
+
+ @RunWith(Parameterized.class)
+ public static class RespectsPipelineOptionsTest {
+
+ @Parameters
+ public static Collection<Object[]> data() {
+ List<Object[]> list = new ArrayList<>();
+ for (Boolean pipelineOption : new Boolean[] {true, false}) {
+ list.add(new Object[] {/*experimentEnabled=*/ false, pipelineOption});
+ list.add(new Object[] {/*experimentEnabled=*/ true, pipelineOption});
+ }
+ return list;
+ }
+
+ @Parameter(0)
+ public Boolean experimentEnabled;
+
+ @Parameter(1)
+ public Boolean pipelineOption;
+
+ @Test
+ public void ignoresIsolatedChannelsConfigWithPipelineOption() {
+ DataflowWorkerHarnessOptions options =
+ PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
+ if (experimentEnabled) {
+ options.setExperiments(
+ Lists.newArrayList(
+
GrpcDispatcherClient.STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_ISOLATED_CHANNELS));
+ }
+ options.setUseWindmillIsolatedChannels(pipelineOption);
+ GrpcDispatcherClient dispatcherClient =
+ GrpcDispatcherClient.create(options, new
WindmillStubFactoryFactoryImpl(options));
+ Matcher<Object> classMatcher =
+ pipelineOption
+ ? instanceOf(IsolationChannel.class)
+ : not(instanceOf(IsolationChannel.class));
+
+ // Job setting disabled, PipelineOption enabled
+
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/
false));
+ CloudWindmillServiceV1Alpha1Stub stub1 =
dispatcherClient.getWindmillServiceStub();
+ CloudWindmillServiceV1Alpha1Stub stub2 =
dispatcherClient.getWindmillServiceStub();
+ assertSame(stub2, stub1);
+ assertThat(stub1.getChannel(), classMatcher);
+
+ // Job setting enabled
+
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/
true));
+ CloudWindmillServiceV1Alpha1Stub stub3 =
dispatcherClient.getWindmillServiceStub();
+ assertSame(stub3, stub1);
+
+ CloudWindmillServiceV1Alpha1Stub stub4 =
dispatcherClient.getWindmillServiceStub();
+ assertSame(stub3, stub4);
+
+ // Job setting disabled
+
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/
false));
+ CloudWindmillServiceV1Alpha1Stub stub5 =
dispatcherClient.getWindmillServiceStub();
+ assertSame(stub4, stub5);
+ }
+ }
+
+ static StreamingGlobalConfig getGlobalConfig(boolean
useWindmillIsolatedChannels) {
+ return StreamingGlobalConfig.builder()
+
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromString("windmill:1234")))
+ .setUserWorkerJobSettings(
+ UserWorkerRunnerV1Settings.newBuilder()
+ .setUseWindmillIsolatedChannels(useWindmillIsolatedChannels)
+ .build())
+ .build();
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
index 7e5801b65de..239e3979a3b 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
@@ -73,6 +73,7 @@ import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.Ge
import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannelFactory;
import
org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactoryFactory;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.CallOptions;
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Channel;
@@ -110,6 +111,7 @@ import org.slf4j.LoggerFactory;
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
})
public class GrpcWindmillServerTest {
+
private static final Logger LOG =
LoggerFactory.getLogger(GrpcWindmillServerTest.class);
private static final int STREAM_CHUNK_SIZE = 2 << 20;
private final long clientId = 10L;
@@ -145,8 +147,9 @@ public class GrpcWindmillServerTest {
name,
experiments,
clientId,
- new FakeWindmillStubFactory(
- () ->
grpcCleanup.register(WindmillChannelFactory.inProcessChannel(name))));
+ new FakeWindmillStubFactoryFactory(
+ new FakeWindmillStubFactory(
+ () ->
grpcCleanup.register(WindmillChannelFactory.inProcessChannel(name)))));
}
private <Stream extends StreamObserver> void maybeInjectError(Stream stream)
{
@@ -212,7 +215,9 @@ public class GrpcWindmillServerTest {
this.client =
GrpcWindmillServer.newApplianceTestInstance(
- inprocessChannel, new FakeWindmillStubFactory(() ->
(ManagedChannel) inprocessChannel));
+ inprocessChannel,
+ new FakeWindmillStubFactoryFactory(
+ new FakeWindmillStubFactory(() -> (ManagedChannel)
inprocessChannel)));
Windmill.GetWorkResponse response1 =
client.getWork(GetWorkRequest.getDefaultInstance());
Windmill.GetWorkResponse response2 =
client.getWork(GetWorkRequest.getDefaultInstance());
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java
new file mode 100644
index 00000000000..51f8b8e1432
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.testing;
+
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory;
+
+public class FakeWindmillStubFactoryFactory implements
WindmillStubFactoryFactory {
+
+ private final WindmillStubFactory windmillStubFactory;
+
+ public FakeWindmillStubFactoryFactory(WindmillStubFactory
windmillStubFactory) {
+ this.windmillStubFactory = windmillStubFactory;
+ }
+
+ @Override
+ public WindmillStubFactory makeWindmillStubFactory(boolean
useIsolatedChannels) {
+ return windmillStubFactory;
+ }
+}