scwhittle commented on code in PR #31504:
URL: https://github.com/apache/beam/pull/31504#discussion_r1638033694


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -129,8 +131,43 @@ public static GrpcGetDataStream create(
     return getDataStream;
   }
 
+  private static String createStreamCancelledErrorMessage(QueuedBatch batch) {
+    return batch.requests().stream()
+        .map(
+            request -> {
+              switch (request.getDataRequest().getKind()) {
+                case GLOBAL:
+                  return "GetSideInput=" + request.getDataRequest().global();
+                case COMPUTATION:
+                  return 
request.getDataRequest().computation().getRequestsList().stream()
+                      .map(
+                          keyedRequest ->
+                              "KeyedGetState=["
+                                  + "key="
+                                  + keyedRequest.getKey()

Review Comment:
   avoid logging customer keys, they might be PII and they can also be large



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -237,32 +244,124 @@ private StreamingDataflowWorker(
     dispatchThread.setName("DispatchThread");
     this.clientId = clientId;
     this.windmillServer = windmillServer;
+
+    WindmillStreamPool<WindmillStream.GetDataStream> getDataStreamPool =
+        WindmillStreamPool.create(
+            Math.max(1, options.getWindmillGetDataStreamCount()),
+            GET_DATA_STREAM_TIMEOUT,
+            windmillServer::getDataStream);
+
     this.metricTrackingWindmillServer =
         MetricTrackingWindmillServerStub.builder(windmillServer, memoryMonitor)
             .setUseStreamingRequests(windmillServiceEnabled)
-            
.setUseSeparateHeartbeatStreams(options.getUseSeparateWindmillHeartbeatStreams())
-            .setNumGetDataStreams(options.getWindmillGetDataStreamCount())
+            .setGetDataStreamPool(getDataStreamPool)
             .build();
 
     // Register standard file systems.
     FileSystems.setDefaultPipelineOptions(options);
 
+    WorkerStatusPages workerStatusPages =
+        WorkerStatusPages.create(DEFAULT_STATUS_PORT, memoryMonitor);
+
+    this.streamingCounters = streamingCounters;
+    this.memoryMonitor = memoryMonitor;
+
+    this.streamingEngineClient = null;
+    this.streamingWorkScheduler =
+        StreamingWorkScheduler.create(
+            options,
+            clock,
+            readerCache,
+            mapTaskExecutorFactory,
+            workUnitExecutor,
+            stateCache::forComputation,
+            request ->
+                streamingEngineClient != null
+                    ? metricTrackingWindmillServer.getSideInputData(
+                        
streamingEngineClient.getGlobalDataStream(request.getDataId().getTag()),
+                        request)
+                    : metricTrackingWindmillServer.getSideInputData(request),
+            failureTracker,
+            workFailureProcessor,
+            streamingCounters,
+            hotKeyLogger,
+            sampler,
+            maxWorkItemCommitBytes,
+            ID_GENERATOR,
+            stageInfoMap);
+
     int stuckCommitDurationMillis =
         windmillServiceEnabled && options.getStuckCommitDurationMillis() > 0
             ? options.getStuckCommitDurationMillis()
             : 0;
+    if (isDirectPathPipeline(options)) {
+      this.streamingEngineClient =

Review Comment:
   For SE non-direct path, can we still use StreamingEngineClient but just 
hard-code the single dispatcher as the worker endpoint?  Seems like it would 
allow us to remove streamingDispatchLoop and other duplication and also seems 
like we want that to work if we need to fallback from non direct path.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -263,6 +305,22 @@ public void sendHealthCheck() {
     }
   }
 
+  @Override
+  public synchronized void close() {
+    super.close();
+
+    // Stream has been explicitly closed. Drain pending input streams and 
request batches.
+    // Future calls to send RPCs will fail.
+    pending.values().forEach(AppendableInputStream::cancel);
+    pending.clear();

Review Comment:
   clearing pending here differs from cancellation in onNewStream and I think 
doing so could lead to verify errors in onResponse.  



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -129,8 +131,43 @@ public static GrpcGetDataStream create(
     return getDataStream;
   }
 
+  private static String createStreamCancelledErrorMessage(QueuedBatch batch) {
+    return batch.requests().stream()
+        .map(
+            request -> {
+              switch (request.getDataRequest().getKind()) {
+                case GLOBAL:
+                  return "GetSideInput=" + request.getDataRequest().global();
+                case COMPUTATION:
+                  return 
request.getDataRequest().computation().getRequestsList().stream()
+                      .map(
+                          keyedRequest ->
+                              "KeyedGetState=["
+                                  + "key="
+                                  + keyedRequest.getKey()

Review Comment:
   But also not sure it's worth having all the information in the exception, 
each key can lot itself when it's read finishes with the error instead.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -308,7 +364,12 @@ private <ResponseT> ResponseT issueRequest(QueuedRequest 
request, ParseFn<Respon
     }
   }
 
-  private void queueRequestAndWait(QueuedRequest request) throws 
InterruptedException {
+  private void tryQueueRequestAndWaitForSend(QueuedRequest request) throws 
InterruptedException {
+    if (isClosed()) {

Review Comment:
   move beneath syncrhonized blcok?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -291,14 +349,12 @@ private <ResponseT> ResponseT issueRequest(QueuedRequest 
request, ParseFn<Respon
     while (true) {
       request.resetResponseStream();
       try {
-        queueRequestAndWait(request);
+        tryQueueRequestAndWaitForSend(request);
         return parseFn.parse(request.getResponseStream());
       } catch (CancellationException e) {
         // Retry issuing the request since the response stream was cancelled.
-        continue;
       } catch (IOException e) {

Review Comment:
   maybe it woudl help document it is expected by explicitly catching the 
WindmillStreamClosedException and rethrowing it?  you could verify isClosed if 
you get that exception



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java:
##########
@@ -278,47 +278,82 @@ public Windmill.GlobalData 
getSideInputData(Windmill.GlobalDataRequest request)
     }
   }
 
-  /** Tells windmill processing is ongoing for the given keys. */
-  public void refreshActiveWork(Map<String, List<HeartbeatRequest>> 
heartbeats) {
+  public Windmill.GlobalData getSideInputData(
+      GetDataStream getDataStream, Windmill.GlobalDataRequest request) {
+    gcThrashingMonitor.waitForResources("GetSideInputData");
+    activeSideInputs.getAndIncrement();
+    try {
+      return getDataStream.requestGlobalData(request);
+    } catch (Exception e) {

Review Comment:
   does this need to handle WindmillStreamClosedException?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/WindmillStreamClosedException.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client;
+
+import org.apache.beam.sdk.annotations.Internal;
+
+@Internal
+public final class WindmillStreamClosedException extends RuntimeException {
+
+  public WindmillStreamClosedException(String message) {
+    super(message);
+  }
+
+  /** Returns whether an exception was caused by a {@link 
WindmillStreamClosedException}. */
+  public static boolean isWindmillStreamCancelledException(Throwable t) {

Review Comment:
   update method name, actually since this is static and nested under class 
name maybe the method should just be wasCausedOf so it looks like:
   
   WindmillStreamClosedException.wasCauseOf(e);



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/HeartbeatSender.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+
+/** Interface for sending heartbeats. */
+@FunctionalInterface
+public interface HeartbeatSender {
+  void sendHeartbeats(Map<String, List<Windmill.HeartbeatRequest>> heartbeats);
+
+  default boolean isInvalid() {

Review Comment:
   can we remove this and just handle it internally to the stream?
   In either case it's racy checking this and then sending on stream so the 
stream will have to handle it anyway



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/MetricTrackingWindmillServerStub.java:
##########
@@ -278,47 +278,82 @@ public Windmill.GlobalData 
getSideInputData(Windmill.GlobalDataRequest request)
     }
   }
 
-  /** Tells windmill processing is ongoing for the given keys. */
-  public void refreshActiveWork(Map<String, List<HeartbeatRequest>> 
heartbeats) {
+  public Windmill.GlobalData getSideInputData(
+      GetDataStream getDataStream, Windmill.GlobalDataRequest request) {
+    gcThrashingMonitor.waitForResources("GetSideInputData");
+    activeSideInputs.getAndIncrement();
+    try {
+      return getDataStream.requestGlobalData(request);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to get side input: ", e);
+    } finally {
+      activeSideInputs.getAndDecrement();
+    }
+  }
+
+  public WindmillStreamPool<GetDataStream> getGetDataStreamPool() {
+    return getDataStreamPool;
+  }
+
+  /**
+   * Attempts to refresh active work, fanning out to each {@link 
GetDataStream} in parallel.
+   *
+   * @implNote Skips closed {@link GetDataStream}(s).
+   */
+  public void refreshActiveWork(
+      Map<HeartbeatSender, Map<String, List<HeartbeatRequest>>> heartbeats) {
     if (heartbeats.isEmpty()) {
       return;
     }
-    activeHeartbeats.set(heartbeats.size());
+
     try {
-      if (useStreamingRequests) {
-        GetDataStream stream = heartbeatStreamPool.getStream();
-        try {
-          stream.refreshActiveWork(heartbeats);
-        } finally {
-          heartbeatStreamPool.releaseStream(stream);
-        }
-      } else {
-        // This code path is only used by appliance which sends heartbeats 
(used to refresh active
-        // work) as KeyedGetDataRequests. So we must translate the 
HeartbeatRequest to a
-        // KeyedGetDataRequest here regardless of the value of 
sendKeyedGetDataRequests.
-        Windmill.GetDataRequest.Builder builder = 
Windmill.GetDataRequest.newBuilder();
-        for (Map.Entry<String, List<HeartbeatRequest>> entry : 
heartbeats.entrySet()) {
-          Windmill.ComputationGetDataRequest.Builder perComputationBuilder =
-              Windmill.ComputationGetDataRequest.newBuilder();
-          perComputationBuilder.setComputationId(entry.getKey());
-          for (HeartbeatRequest request : entry.getValue()) {
-            perComputationBuilder.addRequests(
-                Windmill.KeyedGetDataRequest.newBuilder()
-                    .setShardingKey(request.getShardingKey())
-                    .setWorkToken(request.getWorkToken())
-                    .setCacheToken(request.getCacheToken())
-                    
.addAllLatencyAttribution(request.getLatencyAttributionList())
-                    .build());
-          }
-          builder.addRequests(perComputationBuilder.build());
-        }
-        server.getData(builder.build());
+      // There is 1 destination to send heartbeat requests.
+      if (heartbeats.size() == 1) {
+        Map.Entry<HeartbeatSender, Map<String, List<HeartbeatRequest>>> 
heartbeat =
+            Iterables.getOnlyElement(heartbeats.entrySet());
+        HeartbeatSender sender = heartbeat.getKey();
+        sender.sendHeartbeats(heartbeat.getValue());
+      }
+
+      // There are multiple destinations to send heartbeat requests. Fan out 
requests in parallel.

Review Comment:
   nit: I think
   else {
     // comment
   
   is easier to read



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to