scwhittle commented on code in PR #34148:
URL: https://github.com/apache/beam/pull/34148#discussion_r2121619179


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -271,7 +280,10 @@ private void consumeWorkerMetadata(WindmillEndpoints 
windmillEndpoints) {
     synchronized (metadataLock) {
       // Only process versions greater than what we currently have to prevent 
double processing of
       // metadata. workerMetadataConsumer is single-threaded so we maintain 
ordering.
-      if (windmillEndpoints.version() > pendingMetadataVersion) {
+      // But in case the endpoint type in worker metadata is different from 
the active endpoint
+      // type, also process those endpoints
+      if (windmillEndpoints.version() > pendingMetadataVersion
+          || windmillEndpoints.endpointType() != activeEndpointType) {

Review Comment:
   I think this should only update if the endpoint changes and the version is 
equal to the previous version.  We don't want to move backwards from version 5 
to version 3 for example even if active enpdoint type changes.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java:
##########
@@ -296,6 +326,70 @@ public void 
testOnNewWorkerMetadata_correctlyRemovesStaleWindmillServers()
     assertTrue(currentBackends.globalDataStreams().isEmpty());
   }
 
+  @Test
+  public void testOnNewWorkerMetadata_endpointTypeChanged() throws 
InterruptedException {
+    GetWorkBudgetDistributor getWorkBudgetDistributor = 
mock(GetWorkBudgetDistributor.class);
+    fanOutStreamingEngineWorkProvider =
+        newFanOutStreamingEngineWorkerHarness(
+            GetWorkBudget.builder().setItems(1).setBytes(1).build(),
+            getWorkBudgetDistributor,
+            noOpProcessWorkItemFn());
+
+    String workerToken = "workerToken1";
+    String workerToken2 = "workerToken2";
+
+    WorkerMetadataResponse firstWorkerMetadata =
+        WorkerMetadataResponse.newBuilder()
+            .setMetadataVersion(1)
+            .addWorkEndpoints(
+                WorkerMetadataResponse.Endpoint.newBuilder()
+                    .setBackendWorkerToken(workerToken)
+                    .build())
+            .addWorkEndpoints(
+                WorkerMetadataResponse.Endpoint.newBuilder()
+                    .setBackendWorkerToken(workerToken2)
+                    .build())
+            .setExternalEndpoint(AUTHENTICATING_SERVICE)
+            .setEndpointType(EndpointType.DIRECTPATH)
+            .putAllGlobalDataEndpoints(DEFAULT)
+            .build();
+
+    WorkerMetadataResponse secondWorkerMetadata =
+        WorkerMetadataResponse.newBuilder()
+            .setMetadataVersion(2)

Review Comment:
   have variants where you check that same version with different type is 
actuated and that earlier version with different type is ignored



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamPoolSender.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.harness;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
+import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
+import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillConnection;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Owns and maintains a pool of streams used to fetch {@link
+ * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem}(s) from 
a specific source.
+ */
+@Internal
+@ThreadSafe
+public final class WindmillStreamPoolSender implements WindmillStreamSender {
+  private static final Logger LOG = 
LoggerFactory.getLogger(WindmillStreamPoolSender.class);
+  private static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3;
+  private final AtomicReference<GetWorkBudget> getWorkBudget;
+  private final WindmillConnection connection;
+  private final GetWorkRequest getWorkRequest;
+  private final GrpcWindmillStreamFactory streamingEngineStreamFactory;
+  private final WorkCommitter workCommitter;
+  private final GetDataClient getDataClient;
+  private final HeartbeatSender heartbeatSender;
+  private final StreamingWorkScheduler streamingWorkScheduler;
+  private final Runnable waitForResources;
+  private final Function<String, Optional<ComputationState>> 
computationStateFetcher;
+  private final ExecutorService workProviderExecutor;
+  private final AtomicBoolean isRunning;
+  private final AtomicBoolean hasGetWorkStreamStarted;
+  private @Nullable GetWorkStream getWorkStream;
+
+  private WindmillStreamPoolSender(
+      WindmillConnection connection,
+      GetWorkRequest getWorkRequest,
+      AtomicReference<GetWorkBudget> getWorkBudget,
+      GrpcWindmillStreamFactory streamingEngineStreamFactory,
+      WorkCommitter workCommitter,
+      GetDataClient getDataClient,
+      HeartbeatSender heartbeatSender,
+      StreamingWorkScheduler streamingWorkScheduler,
+      Runnable waitForResources,
+      Function<String, Optional<ComputationState>> computationStateFetcher) {
+    this.isRunning = new AtomicBoolean(false);
+    this.hasGetWorkStreamStarted = new AtomicBoolean(false);
+    this.connection = connection;
+    this.getWorkRequest = getWorkRequest;
+    this.getWorkBudget = getWorkBudget;
+    this.streamingEngineStreamFactory = streamingEngineStreamFactory;
+    this.getDataClient = getDataClient;
+    this.heartbeatSender = heartbeatSender;
+    this.streamingWorkScheduler = streamingWorkScheduler;
+    this.waitForResources = waitForResources;
+    this.computationStateFetcher = computationStateFetcher;
+    this.workCommitter = workCommitter;
+    this.workProviderExecutor =
+        Executors.newSingleThreadExecutor(
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setPriority(Thread.MIN_PRIORITY)
+                .setNameFormat("DispatchThread")
+                .build());
+  }
+
+  public static WindmillStreamPoolSender create(
+      WindmillConnection connection,
+      GetWorkRequest getWorkRequest,
+      GetWorkBudget getWorkBudget,
+      GrpcWindmillStreamFactory streamingEngineStreamFactory,
+      WorkCommitter workCommitter,
+      GetDataClient getDataClient,
+      HeartbeatSender heartbeatSender,
+      StreamingWorkScheduler streamingWorkScheduler,
+      Runnable waitForResources,
+      Function<String, Optional<ComputationState>> computationStateFetcher) {
+    return new WindmillStreamPoolSender(
+        connection,
+        getWorkRequest,
+        new AtomicReference<>(getWorkBudget),
+        streamingEngineStreamFactory,
+        workCommitter,
+        getDataClient,
+        heartbeatSender,
+        streamingWorkScheduler,
+        waitForResources,
+        computationStateFetcher);
+  }
+
+  private void dispatchLoop() {
+    while (isRunning.get()) {
+      this.getWorkStream =

Review Comment:
   this is going to be racy with setBudget, tsan will complain.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -243,17 +245,31 @@ private StreamingDataflowWorker(
     @Nullable ChannelzServlet channelzServlet = null;
     Consumer<PrintWriter> getDataStatusProvider;
     Supplier<Long> currentActiveCommitBytesProvider;
-
-    if (options.isEnableStreamingEngine() && 
options.getIsWindmillServiceDirectPathEnabled()) {
-      // Direct path pipelines.
+    WindmillStreamPool<GetDataStream> getDataStreamPool =

Review Comment:
   but getDataStreamPool isn't used to construct ApplianceGetDataClient (we 
don't have streaming rpcs to appliance).
   
   It seems we should avoid creating the streaming rpc pool in this case.  
Perhaps have getDataStreamPool be nullable and inline the creation of 
getDataClient in this method.
   
   ```
   if (appliance) {
     getDataClient = ...
   } else {
     getDataStreamPool = ...
     getDataClient = ...
   }
   // existing cases
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamPoolSender.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming.harness;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
+import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
+import org.apache.beam.runners.dataflow.worker.streaming.Work;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillConnection;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.commits.WorkCommitter;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.getdata.GetDataClient;
+import 
org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.processing.StreamingWorkScheduler;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender;
+import org.apache.beam.sdk.annotations.Internal;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Owns and maintains a pool of streams used to fetch {@link
+ * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem}(s) from 
a specific source.
+ */
+@Internal
+@ThreadSafe
+public final class WindmillStreamPoolSender implements WindmillStreamSender {
+  private static final Logger LOG = 
LoggerFactory.getLogger(WindmillStreamPoolSender.class);
+  private static final int GET_WORK_STREAM_TIMEOUT_MINUTES = 3;
+  private final AtomicReference<GetWorkBudget> getWorkBudget;
+  private final WindmillConnection connection;
+  private final GetWorkRequest getWorkRequest;
+  private final GrpcWindmillStreamFactory streamingEngineStreamFactory;
+  private final WorkCommitter workCommitter;
+  private final GetDataClient getDataClient;
+  private final HeartbeatSender heartbeatSender;
+  private final StreamingWorkScheduler streamingWorkScheduler;
+  private final Runnable waitForResources;
+  private final Function<String, Optional<ComputationState>> 
computationStateFetcher;
+  private final ExecutorService workProviderExecutor;
+  private final AtomicBoolean isRunning;
+  private final AtomicBoolean hasGetWorkStreamStarted;
+  private @Nullable GetWorkStream getWorkStream;
+
+  private WindmillStreamPoolSender(
+      WindmillConnection connection,
+      GetWorkRequest getWorkRequest,
+      AtomicReference<GetWorkBudget> getWorkBudget,
+      GrpcWindmillStreamFactory streamingEngineStreamFactory,
+      WorkCommitter workCommitter,
+      GetDataClient getDataClient,
+      HeartbeatSender heartbeatSender,
+      StreamingWorkScheduler streamingWorkScheduler,
+      Runnable waitForResources,
+      Function<String, Optional<ComputationState>> computationStateFetcher) {
+    this.isRunning = new AtomicBoolean(false);
+    this.hasGetWorkStreamStarted = new AtomicBoolean(false);
+    this.connection = connection;
+    this.getWorkRequest = getWorkRequest;
+    this.getWorkBudget = getWorkBudget;
+    this.streamingEngineStreamFactory = streamingEngineStreamFactory;
+    this.getDataClient = getDataClient;
+    this.heartbeatSender = heartbeatSender;
+    this.streamingWorkScheduler = streamingWorkScheduler;
+    this.waitForResources = waitForResources;
+    this.computationStateFetcher = computationStateFetcher;
+    this.workCommitter = workCommitter;
+    this.workProviderExecutor =
+        Executors.newSingleThreadExecutor(
+            new ThreadFactoryBuilder()
+                .setDaemon(true)
+                .setPriority(Thread.MIN_PRIORITY)
+                .setNameFormat("DispatchThread")
+                .build());
+  }
+
+  public static WindmillStreamPoolSender create(
+      WindmillConnection connection,
+      GetWorkRequest getWorkRequest,
+      GetWorkBudget getWorkBudget,
+      GrpcWindmillStreamFactory streamingEngineStreamFactory,
+      WorkCommitter workCommitter,
+      GetDataClient getDataClient,
+      HeartbeatSender heartbeatSender,
+      StreamingWorkScheduler streamingWorkScheduler,
+      Runnable waitForResources,
+      Function<String, Optional<ComputationState>> computationStateFetcher) {
+    return new WindmillStreamPoolSender(
+        connection,
+        getWorkRequest,
+        new AtomicReference<>(getWorkBudget),
+        streamingEngineStreamFactory,
+        workCommitter,
+        getDataClient,
+        heartbeatSender,
+        streamingWorkScheduler,
+        waitForResources,
+        computationStateFetcher);
+  }
+
+  private void dispatchLoop() {
+    while (isRunning.get()) {
+      this.getWorkStream =
+          streamingEngineStreamFactory.createGetWorkStream(
+              connection.currentStub(), getWorkRequest, getWorkItemReceiver());
+      this.getWorkStream.start();
+      this.hasGetWorkStreamStarted.set(true);
+
+      try {
+        // Reconnect every now and again to enable better load balancing.
+        // If at any point the server closes the stream, we will reconnect 
immediately;
+        // otherwise
+        // we half-close the stream after some time and create a new one.
+        if (this.getWorkStream != null
+            && !this.getWorkStream.awaitTermination(
+                GET_WORK_STREAM_TIMEOUT_MINUTES, TimeUnit.MINUTES)) {
+          if (this.getWorkStream != null) {
+            this.getWorkStream.halfClose();
+          }
+        }
+      } catch (InterruptedException e) {
+        // Continue processing
+      }
+    }
+  }
+
+  private WorkItemReceiver getWorkItemReceiver() {

Review Comment:
   I don't really like the duplication here between this an 
SingleSourceWorkerHarness in SE mode.
   Could this just create a SingleSourceWorkerHarness and start it?
   
   It seems like it is just missing setBudget support, but perhaps that could 
be added to SingleSourceWorkerHarness? Or alternatively could we have some 
static method to construct this receiver that we share between the two classes?
   



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -281,11 +293,14 @@ private void consumeWorkerMetadata(WindmillEndpoints 
windmillEndpoints) {
   private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
     // Since this is run on a single threaded executor, multiple versions of 
the metadata maybe
     // queued up while a previous version of the windmillEndpoints were being 
consumed. Only consume
-    // the endpoints if they are the most current version.
+    // the endpoints if they are the most current version, or if the endpoint 
type is different
+    // from currently active endpoints.
     synchronized (metadataLock) {
-      if (newWindmillEndpoints.version() < pendingMetadataVersion) {
+      if (newWindmillEndpoints.version() < pendingMetadataVersion
+          && newWindmillEndpoints.endpointType() == activeEndpointType) {

Review Comment:
   ditto, should be 
   
   (version < pending) || (version==pending && type=activeType)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to