scwhittle commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1838119412
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -146,6 +146,7 @@ public void onNext(T value) throws
StreamObserverCancelledException {
"Output channel stalled for {}s, outbound thread {}.",
totalSecondsWaited,
Thread.currentThread().getName());
+ Thread.dumpStack();
Review Comment:
if you want to submit, log this instead. We've used things liek
StringUtils.arrayToNewlines(Thread.currentThread().getStackTrace(), 10);
elsehwere to get a string to log
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java:
##########
@@ -472,7 +473,8 @@ private void flushResponse() {
responseObserver.onNext(responseBuilder.build());
} catch (Exception e) {
// Stream is already closed.
- System.out.println("trieu: " + e);
+ LOG.warn("trieu: ", e);
Review Comment:
rm debug logs
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -144,41 +149,57 @@ protected boolean hasPendingRequests() {
}
@Override
- public void sendHealthCheck() {
+ public void sendHealthCheck() throws WindmillStreamShutdownException {
if (hasPendingRequests()) {
StreamingCommitWorkRequest.Builder builder =
StreamingCommitWorkRequest.newBuilder();
builder.addCommitChunkBuilder().setRequestId(HEARTBEAT_REQUEST_ID);
- send(builder.build());
+ trySend(builder.build());
}
}
@Override
protected void onResponse(StreamingCommitResponse response) {
commitWorkThrottleTimer.stop();
- RuntimeException finalException = null;
+ CommitCompletionException failures = new CommitCompletionException();
for (int i = 0; i < response.getRequestIdCount(); ++i) {
Review Comment:
ping for above comment since might be lost in github comment threads. I was
thinking the builder would not be an exception itself, and then the exception
would just be a simple class without mutating methods.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -67,76 +72,129 @@ public DirectStreamObserver(
}
@Override
- public void onNext(T value) {
+ public void onNext(T value) throws StreamObserverCancelledException {
int awaitPhase = -1;
long totalSecondsWaited = 0;
long waitSeconds = 1;
while (true) {
try {
synchronized (lock) {
+ int currentPhase = isReadyNotifier.getPhase();
+ // Phaser is terminated so don't use the outboundObserver. Since
onError and onCompleted
+ // are synchronized after terminating the phaser if we observe that
the phaser is not
+ // terminated the onNext calls below are guaranteed to not be called
on a closed observer.
+ if (currentPhase < 0) return;
+
+ // If we awaited previously and timed out, wait for the same phase.
Otherwise we're
+ // careful to observe the phase before observing isReady.
+ if (awaitPhase < 0) {
+ awaitPhase = isReadyNotifier.getPhase();
+ // If getPhase() returns a value less than 0, the phaser has been
terminated.
+ if (awaitPhase < 0) {
+ return;
+ }
+ }
+
// We only check isReady periodically to effectively allow for
increasing the outbound
// buffer periodically. This reduces the overhead of blocking while
still restricting
// memory because there is a limited # of streams, and we have a max
messages size of 2MB.
if (++messagesSinceReady <= messagesBetweenIsReadyChecks) {
outboundObserver.onNext(value);
return;
}
- // If we awaited previously and timed out, wait for the same phase.
Otherwise we're
- // careful to observe the phase before observing isReady.
- if (awaitPhase < 0) {
- awaitPhase = phaser.getPhase();
- }
+
if (outboundObserver.isReady()) {
messagesSinceReady = 0;
outboundObserver.onNext(value);
return;
}
}
+
// A callback has been registered to advance the phaser whenever the
observer
// transitions to is ready. Since we are waiting for a phase observed
before the
// outboundObserver.isReady() returned false, we expect it to advance
after the
// channel has become ready. This doesn't always seem to be the case
(despite
// documentation stating otherwise) so we poll periodically and
enforce an overall
// timeout related to the stream deadline.
- phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds,
TimeUnit.SECONDS);
+ int nextPhase =
+ isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds,
TimeUnit.SECONDS);
+ // If nextPhase is a value less than 0, the phaser has been terminated.
+ if (nextPhase < 0) {
Review Comment:
also check isClosed to ensure that we don't call onNext on something that we
called onFinish or onError on.
We need it here since the locking in the ResettableStreamObserver isn't
sufficient as it sends without lock held. You might be able to remove it from
that class if we're just relying on this one to provide enforcement that the
underlying stream observer is used correctly (ie locked, just one terminal
method, no other methods after it)
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -317,10 +320,13 @@ public String backendWorkerToken() {
return backendWorkerToken;
}
+ @SuppressWarnings("GuardedBy")
@Override
public final void shutdown() {
- // Don't lock on "this" before poisoning the request observer as allow IO
to block shutdown.
+ // Don't lock on "this" before poisoning the request observer since
otherwise the observer may
+ // be blocking in send().
requestObserver.poison();
+ isShutdown = true;
Review Comment:
we should remove this (and the suppress)
shouldn't the poison prevent the blocking beneath the mutex? and then the
below lock will be acquired soon?
Setting it to true outside the mutex will break invariants that are easier
to think about if it is strictly guarded by. (and it breaks logic below we'll
never run shutdownInternal)
if we do need it for something it seems like we could have a separate
volatile shutdownRequested boolean. But I'd prefer to figure out what gets
stuck with the current code and fix it instead because it is confusing to have
two.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -67,76 +72,128 @@ public DirectStreamObserver(
}
@Override
- public void onNext(T value) {
+ public void onNext(T value) throws StreamObserverCancelledException {
int awaitPhase = -1;
long totalSecondsWaited = 0;
long waitSeconds = 1;
while (true) {
try {
synchronized (lock) {
+ int currentPhase = isReadyNotifier.getPhase();
+ // Phaser is terminated so don't use the outboundObserver. Since
onError and onCompleted
+ // are synchronized after terminating the phaser if we observe that
the phaser is not
+ // terminated the onNext calls below are guaranteed to not be called
on a closed observer.
+ if (currentPhase < 0) return;
+
+ // If we awaited previously and timed out, wait for the same phase.
Otherwise we're
+ // careful to observe the phase before observing isReady.
+ if (awaitPhase < 0) {
+ awaitPhase = isReadyNotifier.getPhase();
+ // If getPhase() returns a value less than 0, the phaser has been
terminated.
+ if (awaitPhase < 0) {
+ return;
+ }
+ }
+
// We only check isReady periodically to effectively allow for
increasing the outbound
// buffer periodically. This reduces the overhead of blocking while
still restricting
// memory because there is a limited # of streams, and we have a max
messages size of 2MB.
if (++messagesSinceReady <= messagesBetweenIsReadyChecks) {
outboundObserver.onNext(value);
return;
}
- // If we awaited previously and timed out, wait for the same phase.
Otherwise we're
- // careful to observe the phase before observing isReady.
- if (awaitPhase < 0) {
- awaitPhase = phaser.getPhase();
- }
+
if (outboundObserver.isReady()) {
messagesSinceReady = 0;
outboundObserver.onNext(value);
return;
}
}
+
// A callback has been registered to advance the phaser whenever the
observer
// transitions to is ready. Since we are waiting for a phase observed
before the
// outboundObserver.isReady() returned false, we expect it to advance
after the
// channel has become ready. This doesn't always seem to be the case
(despite
// documentation stating otherwise) so we poll periodically and
enforce an overall
// timeout related to the stream deadline.
- phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds,
TimeUnit.SECONDS);
+ int nextPhase =
+ isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds,
TimeUnit.SECONDS);
+ // If nextPhase is a value less than 0, the phaser has been terminated.
+ if (nextPhase < 0) {
+ throw new StreamObserverCancelledException("StreamObserver was
terminated.");
+ }
+
synchronized (lock) {
+ int currentPhase = isReadyNotifier.getPhase();
+ // Phaser is terminated so don't use the outboundObserver. Since
onError and onCompleted
+ // are synchronized after terminating the phaser if we observe that
the phaser is not
+ // terminated the onNext calls below are guaranteed to not be called
on a closed observer.
+ if (currentPhase < 0) return;
messagesSinceReady = 0;
outboundObserver.onNext(value);
return;
}
} catch (TimeoutException e) {
totalSecondsWaited += waitSeconds;
if (totalSecondsWaited > deadlineSeconds) {
- LOG.error(
- "Exceeded timeout waiting for the outboundObserver to become
ready meaning "
- + "that the stream deadline was not respected.");
- throw new RuntimeException(e);
+ String errorMessage =
constructStreamCancelledErrorMessage(totalSecondsWaited);
+ LOG.error(errorMessage);
+ throw new StreamObserverCancelledException(errorMessage, e);
}
- if (totalSecondsWaited > 30) {
+
+ if (totalSecondsWaited > OUTPUT_CHANNEL_CONSIDERED_STALLED_SECONDS) {
LOG.info(
"Output channel stalled for {}s, outbound thread {}.",
totalSecondsWaited,
Thread.currentThread().getName());
}
+
waitSeconds = waitSeconds * 2;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new RuntimeException(e);
+ throw new StreamObserverCancelledException(e);
}
}
}
@Override
public void onError(Throwable t) {
+ isReadyNotifier.forceTermination();
synchronized (lock) {
+ isClosed = true;
Review Comment:
ie if (!isClosed) check to all of these
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]