scwhittle commented on code in PR #34148:
URL: https://github.com/apache/beam/pull/34148#discussion_r2121619179
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -271,7 +280,10 @@ private void consumeWorkerMetadata(WindmillEndpoints
windmillEndpoints) {
synchronized (metadataLock) {
// Only process versions greater than what we currently have to prevent
double processing of
// metadata. workerMetadataConsumer is single-threaded so we maintain
ordering.
- if (windmillEndpoints.version() > pendingMetadataVersion) {
+ // But in case the endpoint type in worker metadata is different from
the active endpoint
+ // type, also process those endpoints
+ if (windmillEndpoints.version() > pendingMetadataVersion
+ || windmillEndpoints.endpointType() != activeEndpointType) {
Review Comment:
I think this should only update if the endpoint changes and the version is
equal to the previous version. We don't want to move backwards from version 5
to version 3 for example even if active enpdoint type changes.
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java:
##########
@@ -296,6 +326,70 @@ public void
testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers()
assertTrue(currentBackends.globalDataStreams().isEmpty());
}
+ @Test
+ public void testOnNewWorkerMetadata_endpointTypeChanged() throws
InterruptedException {
+ GetWorkBudgetDistributor getWorkBudgetDistributor =
mock(GetWorkBudgetDistributor.class);
+ fanOutStreamingEngineWorkProvider =
+ newFanOutStreamingEngineWorkerHarness(
+ GetWorkBudget.builder().setItems(1).setBytes(1).build(),
+ getWorkBudgetDistributor,
+ noOpProcessWorkItemFn());
+
+ String workerToken = "workerToken1";
+ String workerToken2 = "workerToken2";
+
+ WorkerMetadataResponse firstWorkerMetadata =
+ WorkerMetadataResponse.newBuilder()
+ .setMetadataVersion(1)
+ .addWorkEndpoints(
+ WorkerMetadataResponse.Endpoint.newBuilder()
+ .setBackendWorkerToken(workerToken)
+ .build())
+ .addWorkEndpoints(
+ WorkerMetadataResponse.Endpoint.newBuilder()
+ .setBackendWorkerToken(workerToken2)
+ .build())
+ .setExternalEndpoint(AUTHENTICATING_SERVICE)
+ .setEndpointType(EndpointType.DIRECTPATH)
+ .putAllGlobalDataEndpoints(DEFAULT)
+ .build();
+
+ WorkerMetadataResponse secondWorkerMetadata =
+ WorkerMetadataResponse.newBuilder()
+ .setMetadataVersion(2)
Review Comment:
have variants where you check that same version with different type is
actuated and that earlier version with different type is ignored
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamPoolSender.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.harness;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
+import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
+import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillConnection;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
+import org.apache.beam.sdk.annotations.Internal;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Owns and maintains a pool of streams used to fetch {@link
+ * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem}(s) from
a specific source.
+ */
+@Internal
+@ThreadSafe
+public final class WindmillStreamPoolSender implements WindmillStreamSender {
+ private static final Logger LOG =
LoggerFactory.getLogger(WindmillStreamPoolSender.class);
+ private static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3;
+ private final AtomicReference<GetWorkBudget> getWorkBudget;
+ private final WindmillConnection connection;
+ private final GetWorkRequest getWorkRequest;
+ private final GrpcWindmillStreamFactory streamingEngineStreamFactory;
+ private final WorkCommitter workCommitter;
+ private final GetDataClient getDataClient;
+ private final HeartbeatSender heartbeatSender;
+ private final StreamingWorkScheduler streamingWorkScheduler;
+ private final Runnable waitForResources;
+ private final Function<String, Optional<ComputationState>>
computationStateFetcher;
+ private final ExecutorService workProviderExecutor;
+ private final AtomicBoolean isRunning;
+ private final AtomicBoolean hasGetWorkStreamStarted;
+ private @Nullable GetWorkStream getWorkStream;
+
+ private WindmillStreamPoolSender(
+ WindmillConnection connection,
+ GetWorkRequest getWorkRequest,
+ AtomicReference<GetWorkBudget> getWorkBudget,
+ GrpcWindmillStreamFactory streamingEngineStreamFactory,
+ WorkCommitter workCommitter,
+ GetDataClient getDataClient,
+ HeartbeatSender heartbeatSender,
+ StreamingWorkScheduler streamingWorkScheduler,
+ Runnable waitForResources,
+ Function<String, Optional<ComputationState>> computationStateFetcher) {
+ this.isRunning = new AtomicBoolean(false);
+ this.hasGetWorkStreamStarted = new AtomicBoolean(false);
+ this.connection = connection;
+ this.getWorkRequest = getWorkRequest;
+ this.getWorkBudget = getWorkBudget;
+ this.streamingEngineStreamFactory = streamingEngineStreamFactory;
+ this.getDataClient = getDataClient;
+ this.heartbeatSender = heartbeatSender;
+ this.streamingWorkScheduler = streamingWorkScheduler;
+ this.waitForResources = waitForResources;
+ this.computationStateFetcher = computationStateFetcher;
+ this.workCommitter = workCommitter;
+ this.workProviderExecutor =
+ Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setPriority(Thread.MIN_PRIORITY)
+ .setNameFormat("DispatchThread")
+ .build());
+ }
+
+ public static WindmillStreamPoolSender create(
+ WindmillConnection connection,
+ GetWorkRequest getWorkRequest,
+ GetWorkBudget getWorkBudget,
+ GrpcWindmillStreamFactory streamingEngineStreamFactory,
+ WorkCommitter workCommitter,
+ GetDataClient getDataClient,
+ HeartbeatSender heartbeatSender,
+ StreamingWorkScheduler streamingWorkScheduler,
+ Runnable waitForResources,
+ Function<String, Optional<ComputationState>> computationStateFetcher) {
+ return new WindmillStreamPoolSender(
+ connection,
+ getWorkRequest,
+ new AtomicReference<>(getWorkBudget),
+ streamingEngineStreamFactory,
+ workCommitter,
+ getDataClient,
+ heartbeatSender,
+ streamingWorkScheduler,
+ waitForResources,
+ computationStateFetcher);
+ }
+
+ private void dispatchLoop() {
+ while (isRunning.get()) {
+ this.getWorkStream =
Review Comment:
this is going to be racy with setBudget, tsan will complain.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -243,17 +245,31 @@ private StreamingDataflowWorker(
@Nullable ChannelzServlet channelzServlet = null;
Consumer<PrintWriter> getDataStatusProvider;
Supplier<Long> currentActiveCommitBytesProvider;
-
- if (options.isEnableStreamingEngine() &&
options.getIsWindmillServiceDirectPathEnabled()) {
- // Direct path pipelines.
+ WindmillStreamPool<GetDataStream> getDataStreamPool =
Review Comment:
but getDataStreamPool isn't used to construct ApplianceGetDataClient (we
don't have streaming rpcs to appliance).
It seems we should avoid creating the streaming rpc pool in this case.
Perhaps have getDataStreamPool be nullable and inline the creation of
getDataClient in this method.
```
if (appliance) {
getDataClient = ...
} else {
getDataStreamPool = ...
getDataClient = ...
}
// existing cases
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamPoolSender.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.harness;
+
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
+import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
+import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillConnection;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient;
+import
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler;
+import
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
+import org.apache.beam.sdk.annotations.Internal;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Owns and maintains a pool of streams used to fetch {@link
+ * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem}(s) from
a specific source.
+ */
+@Internal
+@ThreadSafe
+public final class WindmillStreamPoolSender implements WindmillStreamSender {
+ private static final Logger LOG =
LoggerFactory.getLogger(WindmillStreamPoolSender.class);
+ private static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3;
+ private final AtomicReference<GetWorkBudget> getWorkBudget;
+ private final WindmillConnection connection;
+ private final GetWorkRequest getWorkRequest;
+ private final GrpcWindmillStreamFactory streamingEngineStreamFactory;
+ private final WorkCommitter workCommitter;
+ private final GetDataClient getDataClient;
+ private final HeartbeatSender heartbeatSender;
+ private final StreamingWorkScheduler streamingWorkScheduler;
+ private final Runnable waitForResources;
+ private final Function<String, Optional<ComputationState>>
computationStateFetcher;
+ private final ExecutorService workProviderExecutor;
+ private final AtomicBoolean isRunning;
+ private final AtomicBoolean hasGetWorkStreamStarted;
+ private @Nullable GetWorkStream getWorkStream;
+
+ private WindmillStreamPoolSender(
+ WindmillConnection connection,
+ GetWorkRequest getWorkRequest,
+ AtomicReference<GetWorkBudget> getWorkBudget,
+ GrpcWindmillStreamFactory streamingEngineStreamFactory,
+ WorkCommitter workCommitter,
+ GetDataClient getDataClient,
+ HeartbeatSender heartbeatSender,
+ StreamingWorkScheduler streamingWorkScheduler,
+ Runnable waitForResources,
+ Function<String, Optional<ComputationState>> computationStateFetcher) {
+ this.isRunning = new AtomicBoolean(false);
+ this.hasGetWorkStreamStarted = new AtomicBoolean(false);
+ this.connection = connection;
+ this.getWorkRequest = getWorkRequest;
+ this.getWorkBudget = getWorkBudget;
+ this.streamingEngineStreamFactory = streamingEngineStreamFactory;
+ this.getDataClient = getDataClient;
+ this.heartbeatSender = heartbeatSender;
+ this.streamingWorkScheduler = streamingWorkScheduler;
+ this.waitForResources = waitForResources;
+ this.computationStateFetcher = computationStateFetcher;
+ this.workCommitter = workCommitter;
+ this.workProviderExecutor =
+ Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setPriority(Thread.MIN_PRIORITY)
+ .setNameFormat("DispatchThread")
+ .build());
+ }
+
+ public static WindmillStreamPoolSender create(
+ WindmillConnection connection,
+ GetWorkRequest getWorkRequest,
+ GetWorkBudget getWorkBudget,
+ GrpcWindmillStreamFactory streamingEngineStreamFactory,
+ WorkCommitter workCommitter,
+ GetDataClient getDataClient,
+ HeartbeatSender heartbeatSender,
+ StreamingWorkScheduler streamingWorkScheduler,
+ Runnable waitForResources,
+ Function<String, Optional<ComputationState>> computationStateFetcher) {
+ return new WindmillStreamPoolSender(
+ connection,
+ getWorkRequest,
+ new AtomicReference<>(getWorkBudget),
+ streamingEngineStreamFactory,
+ workCommitter,
+ getDataClient,
+ heartbeatSender,
+ streamingWorkScheduler,
+ waitForResources,
+ computationStateFetcher);
+ }
+
+ private void dispatchLoop() {
+ while (isRunning.get()) {
+ this.getWorkStream =
+ streamingEngineStreamFactory.createGetWorkStream(
+ connection.currentStub(), getWorkRequest, getWorkItemReceiver());
+ this.getWorkStream.start();
+ this.hasGetWorkStreamStarted.set(true);
+
+ try {
+ // Reconnect every now and again to enable better load balancing.
+ // If at any point the server closes the stream, we will reconnect
immediately;
+ // otherwise
+ // we half-close the stream after some time and create a new one.
+ if (this.getWorkStream != null
+ && !this.getWorkStream.awaitTermination(
+ GET_WORK_STREAM_TIMEOUT_MINUTES, TimeUnit.MINUTES)) {
+ if (this.getWorkStream != null) {
+ this.getWorkStream.halfClose();
+ }
+ }
+ } catch (InterruptedException e) {
+ // Continue processing
+ }
+ }
+ }
+
+ private WorkItemReceiver getWorkItemReceiver() {
Review Comment:
I don't really like the duplication here between this an
SingleSourceWorkerHarness in SE mode.
Could this just create a SingleSourceWorkerHarness and start it?
It seems like it is just missing setBudget support, but perhaps that could
be added to SingleSourceWorkerHarness? Or alternatively could we have some
static method to construct this receiver that we share between the two classes?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -281,11 +293,14 @@ private void consumeWorkerMetadata(WindmillEndpoints
windmillEndpoints) {
private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints
newWindmillEndpoints) {
// Since this is run on a single threaded executor, multiple versions of
the metadata maybe
// queued up while a previous version of the windmillEndpoints were being
consumed. Only consume
- // the endpoints if they are the most current version.
+ // the endpoints if they are the most current version, or if the endpoint
type is different
+ // from currently active endpoints.
synchronized (metadataLock) {
- if (newWindmillEndpoints.version() < pendingMetadataVersion) {
+ if (newWindmillEndpoints.version() < pendingMetadataVersion
+ && newWindmillEndpoints.endpointType() == activeEndpointType) {
Review Comment:
ditto, should be
(version < pending) || (version==pending && type=activeType)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]