scwhittle commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1843416690


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java:
##########
@@ -132,6 +132,10 @@ private static Optional<HostAndPort> 
tryParseDirectEndpointIntoIpV6Address(
             directEndpointAddress.getHostAddress(), (int) 
endpointProto.getPort()));
   }
 
+  public final boolean isEmpty() {
+    return equals(none());

Review Comment:
   How about changing none() to return some singleton instead of building every 
time if we might be calling empty a lot.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##########
@@ -104,6 +104,10 @@ synchronized void poison() {
     }
   }
 
+  synchronized boolean hasReceivedPoisonPill() {

Review Comment:
   nit: how about isPoisoned?
   
   Poison pill was a special element we added to queues etc, it's a little 
confusing to use the term here to me.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -162,24 +170,23 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
   public void onError(Throwable t) {
     isReadyNotifier.forceTermination();
     synchronized (lock) {
-      markClosedOrThrow();
-      outboundObserver.onError(t);
+      if (!isClosed) {

Review Comment:
   I think we should have
   isUserClosed = onError or onCompleted called on this object
   isClosed = onError or onCompleted called on outboundObserver
   Could name isClosed and isOutboundClosed or something if clearer. Woudl be 
good to comment above too.
   
   I think that onError/onCompleted should be like:
   
   check(!isUserClosed);
   isUserClosed = true;
   if (!isClosed) {
     outboundObserver.onError(t);
     isClosed = true; 
   }
   
   enforcing that only one of them is called. And terminate should be like
   
   if (!isClosed) {
     outboundObserver.onError(t);
     isClosed = true;
   }



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##########
@@ -125,6 +129,8 @@ public void onNext(T t) throws StreamClosedException, 
WindmillStreamShutdownExce
         // If the delegate above was already terminated via onError or 
onComplete from another
         // thread.
         logger.warn("StreamObserver was previously cancelled.", e);
+      } catch (RuntimeException ignored) {
+        logger.warn("StreamObserver was unexpectedly cancelled.", e);

Review Comment:
   I was thinking above you should log both e and the currently ignored 
exception
   
   ditto here. Also this log message is a little confusing how about
   "encountered error {} when cancelling due to error"



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -299,24 +303,39 @@ private synchronized void 
consumeWindmillWorkerEndpoints(WindmillEndpoints newWi
   }
 
   /** Close the streams that are no longer valid asynchronously. */
-  private void closeStreamsNotIn(WindmillEndpoints newWindmillEndpoints) {
+  @CanIgnoreReturnValue
+  private ImmutableList<CompletableFuture<Void>> closeStreamsNotIn(
+      WindmillEndpoints newWindmillEndpoints) {
     StreamingEngineBackends currentBackends = backends.get();
-    currentBackends.windmillStreams().entrySet().stream()
-        .filter(
-            connectionAndStream ->
-                
!newWindmillEndpoints.windmillEndpoints().contains(connectionAndStream.getKey()))
-        .forEach(
-            entry ->
-                windmillStreamManager.execute(
-                    () -> closeStreamSender(entry.getKey(), 
entry.getValue())));
+    List<CompletableFuture<Void>> closeStreamFutures =
+        currentBackends.windmillStreams().entrySet().stream()
+            .filter(
+                connectionAndStream ->
+                    !newWindmillEndpoints
+                        .windmillEndpoints()
+                        .contains(connectionAndStream.getKey()))
+            .map(
+                entry ->
+                    CompletableFuture.runAsync(
+                        () -> closeStreamSender(entry.getKey(), 
entry.getValue()),
+                        windmillStreamManager))
+            .collect(Collectors.toList());
 
     Set<Endpoint> newGlobalDataEndpoints =
         new HashSet<>(newWindmillEndpoints.globalDataEndpoints().values());
-    currentBackends.globalDataStreams().values().stream()
-        .filter(sender -> !newGlobalDataEndpoints.contains(sender.endpoint()))
-        .forEach(
-            sender ->
-                windmillStreamManager.execute(() -> 
closeStreamSender(sender.endpoint(), sender)));
+    List<CompletableFuture<Void>> closeGlobalDataStreamFutures =
+        currentBackends.globalDataStreams().values().stream()
+            .filter(sender -> 
!newGlobalDataEndpoints.contains(sender.endpoint()))
+            .map(
+                sender ->
+                    CompletableFuture.runAsync(
+                        () -> closeStreamSender(sender.endpoint(), sender), 
windmillStreamManager))
+            .collect(Collectors.toList());
+
+    return ImmutableList.<CompletableFuture<Void>>builder()

Review Comment:
   Do we want to return the list of futures or woudl it better to return a 
single future by using CompletableFuture.allOf(..)



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -89,6 +90,9 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
             throw new StreamObserverCancelledException("StreamObserver was 
terminated.");
           }
 
+          // We close under "lock", so this should never happen.

Review Comment:
   needs note about terminating phaser first, update below too.
   
   // We close under "lock" after terminating, so if we didn't observer 
termination above
   // we shouldn't be closed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to