m-trieu commented on code in PR #32775: URL: https://github.com/apache/beam/pull/32775#discussion_r1807972205
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java: ########## @@ -280,29 +300,30 @@ private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWi } /** Close the streams that are no longer valid asynchronously. */ - @SuppressWarnings("FutureReturnValueIgnored") - private void closeStaleStreams(WindmillEndpoints newWindmillEndpoints) { + private void closeStreamsNotIn(WindmillEndpoints newWindmillEndpoints) { StreamingEngineBackends currentBackends = backends.get(); - ImmutableMap<Endpoint, WindmillStreamSender> currentWindmillStreams = - currentBackends.windmillStreams(); - currentWindmillStreams.entrySet().stream() + currentBackends.windmillStreams().entrySet().stream() .filter( connectionAndStream -> !newWindmillEndpoints.windmillEndpoints().contains(connectionAndStream.getKey())) .forEach( - entry -> - CompletableFuture.runAsync( - () -> closeStreamSender(entry.getKey(), entry.getValue()), - windmillStreamManager)); + entry -> { + CompletableFuture<Void> ignored = Review Comment: done ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java: ########## @@ -280,29 +300,30 @@ private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWi } /** Close the streams that are no longer valid asynchronously. */ - @SuppressWarnings("FutureReturnValueIgnored") - private void closeStaleStreams(WindmillEndpoints newWindmillEndpoints) { + private void closeStreamsNotIn(WindmillEndpoints newWindmillEndpoints) { StreamingEngineBackends currentBackends = backends.get(); - ImmutableMap<Endpoint, WindmillStreamSender> currentWindmillStreams = - currentBackends.windmillStreams(); - currentWindmillStreams.entrySet().stream() + currentBackends.windmillStreams().entrySet().stream() .filter( connectionAndStream -> !newWindmillEndpoints.windmillEndpoints().contains(connectionAndStream.getKey())) .forEach( - entry -> - CompletableFuture.runAsync( - () -> closeStreamSender(entry.getKey(), entry.getValue()), - windmillStreamManager)); + entry -> { + CompletableFuture<Void> ignored = + CompletableFuture.runAsync( + () -> closeStreamSender(entry.getKey(), entry.getValue()), + windmillStreamManager); + }); Set<Endpoint> newGlobalDataEndpoints = new HashSet<>(newWindmillEndpoints.globalDataEndpoints().values()); currentBackends.globalDataStreams().values().stream() .filter(sender -> !newGlobalDataEndpoints.contains(sender.endpoint())) .forEach( - sender -> - CompletableFuture.runAsync( - () -> closeStreamSender(sender.endpoint(), sender), windmillStreamManager)); + sender -> { + CompletableFuture<Void> ignored = Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org