scwhittle commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1843416690
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java:
##########
@@ -132,6 +132,10 @@ private static Optional<HostAndPort>
tryParseDirectEndpointIntoIpV6Address(
directEndpointAddress.getHostAddress(), (int)
endpointProto.getPort()));
}
+ public final boolean isEmpty() {
+ return equals(none());
Review Comment:
How about changing none() to return some singleton instead of building every
time if we might be calling empty a lot.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##########
@@ -104,6 +104,10 @@ synchronized void poison() {
}
}
+ synchronized boolean hasReceivedPoisonPill() {
Review Comment:
nit: how about isPoisoned?
Poison pill was a special element we added to queues etc, it's a little
confusing to use the term here to me.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -162,24 +170,23 @@ public void onNext(T value) throws
StreamObserverCancelledException {
public void onError(Throwable t) {
isReadyNotifier.forceTermination();
synchronized (lock) {
- markClosedOrThrow();
- outboundObserver.onError(t);
+ if (!isClosed) {
Review Comment:
I think we should have
isUserClosed = onError or onCompleted called on this object
isClosed = onError or onCompleted called on outboundObserver
Could name isClosed and isOutboundClosed or something if clearer. Woudl be
good to comment above too.
I think that onError/onCompleted should be like:
check(!isUserClosed);
isUserClosed = true;
if (!isClosed) {
outboundObserver.onError(t);
isClosed = true;
}
enforcing that only one of them is called. And terminate should be like
if (!isClosed) {
outboundObserver.onError(t);
isClosed = true;
}
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/ResettableThrowingStreamObserver.java:
##########
@@ -125,6 +129,8 @@ public void onNext(T t) throws StreamClosedException,
WindmillStreamShutdownExce
// If the delegate above was already terminated via onError or
onComplete from another
// thread.
logger.warn("StreamObserver was previously cancelled.", e);
+ } catch (RuntimeException ignored) {
+ logger.warn("StreamObserver was unexpectedly cancelled.", e);
Review Comment:
I was thinking above you should log both e and the currently ignored
exception
ditto here. Also this log message is a little confusing how about
"encountered error {} when cancelling due to error"
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java:
##########
@@ -299,24 +303,39 @@ private synchronized void
consumeWindmillWorkerEndpoints(WindmillEndpoints newWi
}
/** Close the streams that are no longer valid asynchronously. */
- private void closeStreamsNotIn(WindmillEndpoints newWindmillEndpoints) {
+ @CanIgnoreReturnValue
+ private ImmutableList<CompletableFuture<Void>> closeStreamsNotIn(
+ WindmillEndpoints newWindmillEndpoints) {
StreamingEngineBackends currentBackends = backends.get();
- currentBackends.windmillStreams().entrySet().stream()
- .filter(
- connectionAndStream ->
-
!newWindmillEndpoints.windmillEndpoints().contains(connectionAndStream.getKey()))
- .forEach(
- entry ->
- windmillStreamManager.execute(
- () -> closeStreamSender(entry.getKey(),
entry.getValue())));
+ List<CompletableFuture<Void>> closeStreamFutures =
+ currentBackends.windmillStreams().entrySet().stream()
+ .filter(
+ connectionAndStream ->
+ !newWindmillEndpoints
+ .windmillEndpoints()
+ .contains(connectionAndStream.getKey()))
+ .map(
+ entry ->
+ CompletableFuture.runAsync(
+ () -> closeStreamSender(entry.getKey(),
entry.getValue()),
+ windmillStreamManager))
+ .collect(Collectors.toList());
Set<Endpoint> newGlobalDataEndpoints =
new HashSet<>(newWindmillEndpoints.globalDataEndpoints().values());
- currentBackends.globalDataStreams().values().stream()
- .filter(sender -> !newGlobalDataEndpoints.contains(sender.endpoint()))
- .forEach(
- sender ->
- windmillStreamManager.execute(() ->
closeStreamSender(sender.endpoint(), sender)));
+ List<CompletableFuture<Void>> closeGlobalDataStreamFutures =
+ currentBackends.globalDataStreams().values().stream()
+ .filter(sender ->
!newGlobalDataEndpoints.contains(sender.endpoint()))
+ .map(
+ sender ->
+ CompletableFuture.runAsync(
+ () -> closeStreamSender(sender.endpoint(), sender),
windmillStreamManager))
+ .collect(Collectors.toList());
+
+ return ImmutableList.<CompletableFuture<Void>>builder()
Review Comment:
Do we want to return the list of futures or woudl it better to return a
single future by using CompletableFuture.allOf(..)
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -89,6 +90,9 @@ public void onNext(T value) throws
StreamObserverCancelledException {
throw new StreamObserverCancelledException("StreamObserver was
terminated.");
}
+ // We close under "lock", so this should never happen.
Review Comment:
needs note about terminating phaser first, update below too.
// We close under "lock" after terminating, so if we didn't observer
termination above
// we shouldn't be closed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]