m-trieu commented on code in PR #32775:
URL: https://github.com/apache/beam/pull/32775#discussion_r1807972205


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -280,29 +300,30 @@ private synchronized void 
consumeWindmillWorkerEndpoints(WindmillEndpoints newWi
   }
 
   /** Close the streams that are no longer valid asynchronously. */
-  @SuppressWarnings("FutureReturnValueIgnored")
-  private void closeStaleStreams(WindmillEndpoints newWindmillEndpoints) {
+  private void closeStreamsNotIn(WindmillEndpoints newWindmillEndpoints) {
     StreamingEngineBackends currentBackends = backends.get();
-    ImmutableMap<Endpoint, WindmillStreamSender> currentWindmillStreams =
-        currentBackends.windmillStreams();
-    currentWindmillStreams.entrySet().stream()
+    currentBackends.windmillStreams().entrySet().stream()
         .filter(
             connectionAndStream ->
                 
!newWindmillEndpoints.windmillEndpoints().contains(connectionAndStream.getKey()))
         .forEach(
-            entry ->
-                CompletableFuture.runAsync(
-                    () -> closeStreamSender(entry.getKey(), entry.getValue()),
-                    windmillStreamManager));
+            entry -> {
+              CompletableFuture<Void> ignored =

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -280,29 +300,30 @@ private synchronized void 
consumeWindmillWorkerEndpoints(WindmillEndpoints newWi
   }
 
   /** Close the streams that are no longer valid asynchronously. */
-  @SuppressWarnings("FutureReturnValueIgnored")
-  private void closeStaleStreams(WindmillEndpoints newWindmillEndpoints) {
+  private void closeStreamsNotIn(WindmillEndpoints newWindmillEndpoints) {
     StreamingEngineBackends currentBackends = backends.get();
-    ImmutableMap<Endpoint, WindmillStreamSender> currentWindmillStreams =
-        currentBackends.windmillStreams();
-    currentWindmillStreams.entrySet().stream()
+    currentBackends.windmillStreams().entrySet().stream()
         .filter(
             connectionAndStream ->
                 
!newWindmillEndpoints.windmillEndpoints().contains(connectionAndStream.getKey()))
         .forEach(
-            entry ->
-                CompletableFuture.runAsync(
-                    () -> closeStreamSender(entry.getKey(), entry.getValue()),
-                    windmillStreamManager));
+            entry -> {
+              CompletableFuture<Void> ignored =
+                  CompletableFuture.runAsync(
+                      () -> closeStreamSender(entry.getKey(), 
entry.getValue()),
+                      windmillStreamManager);
+            });
 
     Set<Endpoint> newGlobalDataEndpoints =
         new HashSet<>(newWindmillEndpoints.globalDataEndpoints().values());
     currentBackends.globalDataStreams().values().stream()
         .filter(sender -> !newGlobalDataEndpoints.contains(sender.endpoint()))
         .forEach(
-            sender ->
-                CompletableFuture.runAsync(
-                    () -> closeStreamSender(sender.endpoint(), sender), 
windmillStreamManager));
+            sender -> {
+              CompletableFuture<Void> ignored =

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to