This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 02d8f56a1e8 [Dataflow Streaming] Remove rolled out Windmill isolated
channels flag (#37844)
02d8f56a1e8 is described below
commit 02d8f56a1e876d135ae157b3b4aa2ceed2be16c0
Author: Arun Pandian <[email protected]>
AuthorDate: Fri Mar 13 01:53:09 2026 -0700
[Dataflow Streaming] Remove rolled out Windmill isolated channels flag
(#37844)
---
.../options/DataflowStreamingPipelineOptions.java | 4 +-
.../dataflow/worker/StreamingDataflowWorker.java | 6 +-
.../windmill/client/grpc/GrpcDispatcherClient.java | 48 +------
.../windmill/client/grpc/GrpcWindmillServer.java | 3 +-
.../grpc/stubs/WindmillStubFactoryFactory.java | 2 +-
.../grpc/stubs/WindmillStubFactoryFactoryImpl.java | 19 +--
.../FanOutStreamingEngineWorkerHarnessTest.java | 3 -
.../client/grpc/GrpcDispatcherClientTest.java | 141 ---------------------
.../testing/FakeWindmillStubFactoryFactory.java | 2 +-
.../worker/windmill/src/main/proto/windmill.proto | 5 +-
10 files changed, 25 insertions(+), 208 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
index 9cc98276f2d..2e0e7efd73a 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
@@ -126,7 +126,9 @@ public interface DataflowStreamingPipelineOptions extends
PipelineOptions {
void setWindmillMessagesBetweenIsReadyChecks(int value);
- @Description("If true, a most a single active rpc will be used per channel.")
+ /** @deprecated since 2.73.0 */
+ @Deprecated
+ @Description("Unused flag.")
Boolean getUseWindmillIsolatedChannels();
void setUseWindmillIsolatedChannels(Boolean value);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 98f596141ee..1b13b72986f 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -725,7 +725,7 @@ public final class StreamingDataflowWorker {
Function<ComputationConfig.Fetcher, ComputationStateCache>
computationStateCacheFactory) {
if (options.isEnableStreamingEngine()) {
GrpcDispatcherClient dispatcherClient =
- GrpcDispatcherClient.create(options, new
WindmillStubFactoryFactoryImpl(options));
+ GrpcDispatcherClient.create(new
WindmillStubFactoryFactoryImpl(options));
ComputationConfig.Fetcher configFetcher =
StreamingEngineComputationConfigFetcher.create(
options.getGlobalConfigRefreshPeriod().getMillis(),
dataflowServiceClient);
@@ -753,7 +753,7 @@ public final class StreamingDataflowWorker {
if (options.getWindmillServiceEndpoint() != null
|| options.getLocalWindmillHostport().startsWith("grpc:")) {
GrpcDispatcherClient dispatcherClient =
- GrpcDispatcherClient.create(options, new
WindmillStubFactoryFactoryImpl(options));
+ GrpcDispatcherClient.create(new
WindmillStubFactoryFactoryImpl(options));
GrpcWindmillStreamFactory windmillStreamFactory =
windmillStreamFactoryBuilder
.setHealthCheckIntervalMillis(
@@ -920,7 +920,7 @@ public final class StreamingDataflowWorker {
createGrpcwindmillStreamFactoryBuilder(options, 1)
.setProcessHeartbeatResponses(
new
WorkHeartbeatResponseProcessor(computationStateCache::get));
- GrpcDispatcherClient grpcDispatcherClient =
GrpcDispatcherClient.create(options, stubFactory);
+ GrpcDispatcherClient grpcDispatcherClient =
GrpcDispatcherClient.create(stubFactory);
grpcDispatcherClient.consumeWindmillDispatcherEndpoints(
ImmutableSet.<HostAndPort>builder()
.add(HostAndPort.fromHost("StreamingDataflowWorkerTest"))
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
index 82e66c4b0d7..3a0773af7fa 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java
@@ -28,11 +28,9 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
-import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc.CloudWindmillMetadataServiceV1Alpha1Stub;
@@ -66,42 +64,25 @@ public class GrpcDispatcherClient {
@GuardedBy("this")
private final Random rand;
- private final WindmillStubFactoryFactory windmillStubFactoryFactory;
-
- private final AtomicReference<WindmillStubFactory> windmillStubFactory = new
AtomicReference<>();
-
- private final AtomicBoolean useIsolatedChannels = new AtomicBoolean();
- private final boolean reactToIsolatedChannelsJobSetting;
+ private final WindmillStubFactory windmillStubFactory;
private GrpcDispatcherClient(
- DataflowWorkerHarnessOptions options,
WindmillStubFactoryFactory windmillStubFactoryFactory,
DispatcherStubs initialDispatcherStubs,
Random rand) {
- this.windmillStubFactoryFactory = windmillStubFactoryFactory;
- if (options.getUseWindmillIsolatedChannels() != null) {
- this.useIsolatedChannels.set(options.getUseWindmillIsolatedChannels());
- this.reactToIsolatedChannelsJobSetting = false;
- } else {
- this.useIsolatedChannels.set(false);
- this.reactToIsolatedChannelsJobSetting = true;
- }
- this.windmillStubFactory.set(
-
windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels.get()));
+ this.windmillStubFactory =
windmillStubFactoryFactory.makeWindmillStubFactory();
this.rand = rand;
this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
this.onInitializedEndpoints = new CountDownLatch(1);
}
- public static GrpcDispatcherClient create(
- DataflowWorkerHarnessOptions options, WindmillStubFactoryFactory
windmillStubFactoryFactory) {
+ public static GrpcDispatcherClient create(WindmillStubFactoryFactory
windmillStubFactoryFactory) {
return new GrpcDispatcherClient(
- options, windmillStubFactoryFactory, DispatcherStubs.empty(), new
Random());
+ windmillStubFactoryFactory, DispatcherStubs.empty(), new Random());
}
@VisibleForTesting
public static GrpcDispatcherClient forTesting(
- DataflowWorkerHarnessOptions options,
WindmillStubFactoryFactory windmillStubFactoryFactory,
List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
List<CloudWindmillMetadataServiceV1Alpha1Stub>
windmillMetadataServiceStubs,
@@ -110,7 +91,6 @@ public class GrpcDispatcherClient {
dispatcherEndpoints.size() == windmillServiceStubs.size()
&& windmillServiceStubs.size() ==
windmillMetadataServiceStubs.size());
return new GrpcDispatcherClient(
- options,
windmillStubFactoryFactory,
DispatcherStubs.create(
dispatcherEndpoints, windmillServiceStubs,
windmillMetadataServiceStubs),
@@ -172,31 +152,17 @@ public class GrpcDispatcherClient {
LOG.warn("Dispatcher client received empty windmill service endpoints
from global config");
return;
}
- boolean forceRecreateStubs = false;
- if (reactToIsolatedChannelsJobSetting) {
- boolean useIsolatedChannels =
config.userWorkerJobSettings().getUseWindmillIsolatedChannels();
- if (this.useIsolatedChannels.getAndSet(useIsolatedChannels) !=
useIsolatedChannels) {
- windmillStubFactory.set(
-
windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels));
- forceRecreateStubs = true;
- }
- }
- consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints(),
forceRecreateStubs);
+ consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints());
}
public synchronized void consumeWindmillDispatcherEndpoints(
ImmutableSet<HostAndPort> dispatcherEndpoints) {
- consumeWindmillDispatcherEndpoints(dispatcherEndpoints, /*
forceRecreateStubs= */ false);
- }
-
- private synchronized void consumeWindmillDispatcherEndpoints(
- ImmutableSet<HostAndPort> dispatcherEndpoints, boolean
forceRecreateStubs) {
ImmutableSet<HostAndPort> currentDispatcherEndpoints =
dispatcherStubs.get().dispatcherEndpoints();
Preconditions.checkArgument(
dispatcherEndpoints != null && !dispatcherEndpoints.isEmpty(),
"Cannot set dispatcher endpoints to nothing.");
- if (!forceRecreateStubs &&
currentDispatcherEndpoints.equals(dispatcherEndpoints)) {
+ if (currentDispatcherEndpoints.equals(dispatcherEndpoints)) {
// The endpoints are equal don't recreate the stubs.
return;
}
@@ -207,7 +173,7 @@ public class GrpcDispatcherClient {
}
LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}",
dispatcherEndpoints);
- dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints,
windmillStubFactory.get()));
+ dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints,
windmillStubFactory));
onInitializedEndpoints.countDown();
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
index 1cd2d2f5315..4d5acdc8071 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java
@@ -166,7 +166,6 @@ public final class GrpcWindmillServer extends
WindmillServerStub {
Set<HostAndPort> dispatcherEndpoints =
Sets.newHashSet(HostAndPort.fromHost(name));
GrpcDispatcherClient dispatcherClient =
GrpcDispatcherClient.forTesting(
- testOptions,
windmillStubFactoryFactory,
windmillServiceStubs,
windmillMetadataServiceStubs,
@@ -198,7 +197,7 @@ public final class GrpcWindmillServer extends
WindmillServerStub {
options,
GrpcWindmillStreamFactory.of(createJobHeader(options, 1)).build(),
// No-op, Appliance does not use Dispatcher to call Streaming
Engine.
- GrpcDispatcherClient.create(options, windmillStubFactoryFactory));
+ GrpcDispatcherClient.create(windmillStubFactoryFactory));
testServer.syncApplianceStub =
createWindmillApplianceStubWithDeadlineInterceptor(channel);
return testServer;
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java
index f7dd9a22b99..8daff4faceb 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java
@@ -21,5 +21,5 @@ import org.apache.beam.sdk.annotations.Internal;
@Internal
public interface WindmillStubFactoryFactory {
- WindmillStubFactory makeWindmillStubFactory(boolean useIsolatedChannels);
+ WindmillStubFactory makeWindmillStubFactory();
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java
index 6de6b633753..00dfb155c29 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java
@@ -33,24 +33,19 @@ public class WindmillStubFactoryFactoryImpl implements
WindmillStubFactoryFactor
}
@Override
- public WindmillStubFactory makeWindmillStubFactory(boolean
useIsolatedChannels) {
+ public WindmillStubFactory makeWindmillStubFactory() {
ChannelCache channelCache =
ChannelCache.create(
(flowControlSettings, serviceAddress) ->
// IsolationChannel will create and manage separate RPC
channels to the same
// serviceAddress via calling the channelFactory, else just
directly return the
// RPC channel.
- useIsolatedChannels
- ? IsolationChannel.create(
- () ->
- remoteChannel(
- serviceAddress.getServiceAddress(),
- windmillServiceRpcChannelAliveTimeoutSec,
- flowControlSettings))
- : remoteChannel(
- serviceAddress.getServiceAddress(),
- windmillServiceRpcChannelAliveTimeoutSec,
- flowControlSettings));
+ IsolationChannel.create(
+ () ->
+ remoteChannel(
+ serviceAddress.getServiceAddress(),
+ windmillServiceRpcChannelAliveTimeoutSec,
+ flowControlSettings)));
return ChannelCachingRemoteStubFactory.create(gcpCredential, channelCache);
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
index 94c8f4b7595..441319b66a8 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
@@ -36,7 +36,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
-import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
@@ -58,7 +57,6 @@ import
org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemScheduler;
import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor;
import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetSpender;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Server;
import
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessServerBuilder;
import
org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessSocketAddress;
@@ -112,7 +110,6 @@ public class FanOutStreamingEngineWorkerHarnessTest {
() ->
grpcCleanup.register(WindmillChannels.inProcessChannel(CHANNEL_NAME)));
private final GrpcDispatcherClient dispatcherClient =
GrpcDispatcherClient.forTesting(
- PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class),
new FakeWindmillStubFactoryFactory(stubFactory),
new ArrayList<>(),
new ArrayList<>(),
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
deleted file mode 100644
index c04456906ea..00000000000
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.windmill.client.grpc;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertSame;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
-import
org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
-import
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
-import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings;
-import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel;
-import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
-import org.hamcrest.Matcher;
-import org.junit.Test;
-import org.junit.experimental.runners.Enclosed;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(Enclosed.class)
-public class GrpcDispatcherClientTest {
-
- @RunWith(JUnit4.class)
- public static class RespectsJobSettingTest {
-
- @Test
- public void createsNewStubWhenIsolatedChannelsConfigIsChanged() {
- DataflowWorkerHarnessOptions options =
- PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
- GrpcDispatcherClient dispatcherClient =
- GrpcDispatcherClient.create(options, new
WindmillStubFactoryFactoryImpl(options));
- // Create first time with Isolated channels disabled
-
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/
false));
- CloudWindmillServiceV1Alpha1Stub stub1 =
dispatcherClient.getWindmillServiceStub();
- CloudWindmillServiceV1Alpha1Stub stub2 =
dispatcherClient.getWindmillServiceStub();
- assertSame(stub2, stub1);
- assertThat(stub1.getChannel(), not(instanceOf(IsolationChannel.class)));
-
- // Enable Isolated channels
-
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/
true));
- CloudWindmillServiceV1Alpha1Stub stub3 =
dispatcherClient.getWindmillServiceStub();
- assertNotSame(stub3, stub1);
-
- assertThat(stub3.getChannel(), instanceOf(IsolationChannel.class));
- CloudWindmillServiceV1Alpha1Stub stub4 =
dispatcherClient.getWindmillServiceStub();
- assertSame(stub3, stub4);
-
- // Disable Isolated channels
-
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/
false));
- CloudWindmillServiceV1Alpha1Stub stub5 =
dispatcherClient.getWindmillServiceStub();
- assertNotSame(stub4, stub5);
- assertThat(stub5.getChannel(), not(instanceOf(IsolationChannel.class)));
- }
- }
-
- @RunWith(Parameterized.class)
- public static class RespectsPipelineOptionsTest {
-
- @Parameters
- public static Collection<Object[]> data() {
- List<Object[]> list = new ArrayList<>();
- for (Boolean pipelineOption : new Boolean[] {true, false}) {
- list.add(new Object[] {pipelineOption});
- }
- return list;
- }
-
- @Parameter(0)
- public Boolean pipelineOption;
-
- @Test
- public void ignoresIsolatedChannelsConfigWithPipelineOption() {
- DataflowWorkerHarnessOptions options =
- PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
- options.setUseWindmillIsolatedChannels(pipelineOption);
- GrpcDispatcherClient dispatcherClient =
- GrpcDispatcherClient.create(options, new
WindmillStubFactoryFactoryImpl(options));
- Matcher<Object> classMatcher =
- pipelineOption
- ? instanceOf(IsolationChannel.class)
- : not(instanceOf(IsolationChannel.class));
-
- // Job setting disabled, PipelineOption enabled
-
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/
false));
- CloudWindmillServiceV1Alpha1Stub stub1 =
dispatcherClient.getWindmillServiceStub();
- CloudWindmillServiceV1Alpha1Stub stub2 =
dispatcherClient.getWindmillServiceStub();
- assertSame(stub2, stub1);
- assertThat(stub1.getChannel(), classMatcher);
-
- // Job setting enabled
-
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/
true));
- CloudWindmillServiceV1Alpha1Stub stub3 =
dispatcherClient.getWindmillServiceStub();
- assertSame(stub3, stub1);
-
- CloudWindmillServiceV1Alpha1Stub stub4 =
dispatcherClient.getWindmillServiceStub();
- assertSame(stub3, stub4);
-
- // Job setting disabled
-
dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/
false));
- CloudWindmillServiceV1Alpha1Stub stub5 =
dispatcherClient.getWindmillServiceStub();
- assertSame(stub4, stub5);
- }
- }
-
- static StreamingGlobalConfig getGlobalConfig(boolean
useWindmillIsolatedChannels) {
- return StreamingGlobalConfig.builder()
-
.setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromString("windmill:1234")))
- .setUserWorkerJobSettings(
- UserWorkerRunnerV1Settings.newBuilder()
- .setUseWindmillIsolatedChannels(useWindmillIsolatedChannels)
- .build())
- .build();
- }
-}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java
index 51f8b8e1432..741b025e601 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java
@@ -29,7 +29,7 @@ public class FakeWindmillStubFactoryFactory implements
WindmillStubFactoryFactor
}
@Override
- public WindmillStubFactory makeWindmillStubFactory(boolean
useIsolatedChannels) {
+ public WindmillStubFactory makeWindmillStubFactory() {
return windmillStubFactory;
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
index a4b3df906dd..8c6cc08f4d5 100644
---
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
+++
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
@@ -966,9 +966,6 @@ enum ConnectivityType {
// Settings to control runtime behavior of the java runner v1 user worker.
message UserWorkerRunnerV1Settings {
- // If true, use separate channels for each windmill RPC.
- optional bool use_windmill_isolated_channels = 1 [default = true];
-
// If true, use separate streaming RPC for windmill heartbeats and state
reads.
optional bool use_separate_windmill_heartbeat_streams = 2 [default = true];
@@ -976,6 +973,8 @@ message UserWorkerRunnerV1Settings {
optional ConnectivityType connectivity_type = 4
[default = CONNECTIVITY_TYPE_DEFAULT];
+
+ reserved 1;
}
service WindmillAppliance {