scwhittle commented on code in PR #32774:
URL: https://github.com/apache/beam/pull/32774#discussion_r1838119412


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -146,6 +146,7 @@ public void onNext(T value) throws 
StreamObserverCancelledException {
               "Output channel stalled for {}s, outbound thread {}.",
               totalSecondsWaited,
               Thread.currentThread().getName());
+          Thread.dumpStack();

Review Comment:
   if you want to submit, log this instead. We've used things liek 
StringUtils.arrayToNewlines(Thread.currentThread().getStackTrace(), 10); 
elsehwere to get a string to log



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java:
##########
@@ -472,7 +473,8 @@ private void flushResponse() {
                     responseObserver.onNext(responseBuilder.build());
                   } catch (Exception e) {
                     // Stream is already closed.
-                    System.out.println("trieu: " + e);
+                    LOG.warn("trieu: ", e);

Review Comment:
   rm debug logs



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -144,41 +149,57 @@ protected boolean hasPendingRequests() {
   }
 
   @Override
-  public void sendHealthCheck() {
+  public void sendHealthCheck() throws WindmillStreamShutdownException {
     if (hasPendingRequests()) {
       StreamingCommitWorkRequest.Builder builder = 
StreamingCommitWorkRequest.newBuilder();
       builder.addCommitChunkBuilder().setRequestId(HEARTBEAT_REQUEST_ID);
-      send(builder.build());
+      trySend(builder.build());
     }
   }
 
   @Override
   protected void onResponse(StreamingCommitResponse response) {
     commitWorkThrottleTimer.stop();
 
-    RuntimeException finalException = null;
+    CommitCompletionException failures = new CommitCompletionException();
     for (int i = 0; i < response.getRequestIdCount(); ++i) {

Review Comment:
   ping for above comment since might be lost in github comment threads. I was 
thinking the builder would not be an exception itself, and then the exception 
would just be a simple class without mutating methods.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -67,76 +72,129 @@ public DirectStreamObserver(
   }
 
   @Override
-  public void onNext(T value) {
+  public void onNext(T value) throws StreamObserverCancelledException {
     int awaitPhase = -1;
     long totalSecondsWaited = 0;
     long waitSeconds = 1;
     while (true) {
       try {
         synchronized (lock) {
+          int currentPhase = isReadyNotifier.getPhase();
+          // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
+          // are synchronized after terminating the phaser if we observe that 
the phaser is not
+          // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
+          if (currentPhase < 0) return;
+
+          // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
+          // careful to observe the phase before observing isReady.
+          if (awaitPhase < 0) {
+            awaitPhase = isReadyNotifier.getPhase();
+            // If getPhase() returns a value less than 0, the phaser has been 
terminated.
+            if (awaitPhase < 0) {
+              return;
+            }
+          }
+
           // We only check isReady periodically to effectively allow for 
increasing the outbound
           // buffer periodically. This reduces the overhead of blocking while 
still restricting
           // memory because there is a limited # of streams, and we have a max 
messages size of 2MB.
           if (++messagesSinceReady <= messagesBetweenIsReadyChecks) {
             outboundObserver.onNext(value);
             return;
           }
-          // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
-          // careful to observe the phase before observing isReady.
-          if (awaitPhase < 0) {
-            awaitPhase = phaser.getPhase();
-          }
+
           if (outboundObserver.isReady()) {
             messagesSinceReady = 0;
             outboundObserver.onNext(value);
             return;
           }
         }
+
         // A callback has been registered to advance the phaser whenever the 
observer
         // transitions to  is ready. Since we are waiting for a phase observed 
before the
         // outboundObserver.isReady() returned false, we expect it to advance 
after the
         // channel has become ready.  This doesn't always seem to be the case 
(despite
         // documentation stating otherwise) so we poll periodically and 
enforce an overall
         // timeout related to the stream deadline.
-        phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+        int nextPhase =
+            isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+        // If nextPhase is a value less than 0, the phaser has been terminated.
+        if (nextPhase < 0) {

Review Comment:
   also check isClosed to ensure that we don't call onNext on something that we 
called onFinish or onError on.
   
   We need it here since the locking in the ResettableStreamObserver isn't 
sufficient as it sends without lock held.  You might be able to remove it from 
that class if we're just relying on this one to provide enforcement that the 
underlying stream observer is used correctly (ie locked, just one terminal 
method, no other methods after it)



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -317,10 +320,13 @@ public String backendWorkerToken() {
     return backendWorkerToken;
   }
 
+  @SuppressWarnings("GuardedBy")
   @Override
   public final void shutdown() {
-    // Don't lock on "this" before poisoning the request observer as allow IO 
to block shutdown.
+    // Don't lock on "this" before poisoning the request observer since 
otherwise the observer may
+    // be blocking in send().
     requestObserver.poison();
+    isShutdown = true;

Review Comment:
   we should remove this (and the suppress)
   
   shouldn't the poison prevent the blocking beneath the mutex? and then the 
below lock will be acquired soon?
   
   Setting it to true outside the mutex will break invariants that are easier 
to think about if it is strictly guarded by. (and it breaks logic below we'll 
never run shutdownInternal)
   
   if we do need it for something it seems like we could have a separate 
volatile shutdownRequested boolean.  But I'd prefer to figure out what gets 
stuck with the current code and fix it instead because it is confusing to have 
two.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/observers/DirectStreamObserver.java:
##########
@@ -67,76 +72,128 @@ public DirectStreamObserver(
   }
 
   @Override
-  public void onNext(T value) {
+  public void onNext(T value) throws StreamObserverCancelledException {
     int awaitPhase = -1;
     long totalSecondsWaited = 0;
     long waitSeconds = 1;
     while (true) {
       try {
         synchronized (lock) {
+          int currentPhase = isReadyNotifier.getPhase();
+          // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
+          // are synchronized after terminating the phaser if we observe that 
the phaser is not
+          // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
+          if (currentPhase < 0) return;
+
+          // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
+          // careful to observe the phase before observing isReady.
+          if (awaitPhase < 0) {
+            awaitPhase = isReadyNotifier.getPhase();
+            // If getPhase() returns a value less than 0, the phaser has been 
terminated.
+            if (awaitPhase < 0) {
+              return;
+            }
+          }
+
           // We only check isReady periodically to effectively allow for 
increasing the outbound
           // buffer periodically. This reduces the overhead of blocking while 
still restricting
           // memory because there is a limited # of streams, and we have a max 
messages size of 2MB.
           if (++messagesSinceReady <= messagesBetweenIsReadyChecks) {
             outboundObserver.onNext(value);
             return;
           }
-          // If we awaited previously and timed out, wait for the same phase. 
Otherwise we're
-          // careful to observe the phase before observing isReady.
-          if (awaitPhase < 0) {
-            awaitPhase = phaser.getPhase();
-          }
+
           if (outboundObserver.isReady()) {
             messagesSinceReady = 0;
             outboundObserver.onNext(value);
             return;
           }
         }
+
         // A callback has been registered to advance the phaser whenever the 
observer
         // transitions to  is ready. Since we are waiting for a phase observed 
before the
         // outboundObserver.isReady() returned false, we expect it to advance 
after the
         // channel has become ready.  This doesn't always seem to be the case 
(despite
         // documentation stating otherwise) so we poll periodically and 
enforce an overall
         // timeout related to the stream deadline.
-        phaser.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+        int nextPhase =
+            isReadyNotifier.awaitAdvanceInterruptibly(awaitPhase, waitSeconds, 
TimeUnit.SECONDS);
+        // If nextPhase is a value less than 0, the phaser has been terminated.
+        if (nextPhase < 0) {
+          throw new StreamObserverCancelledException("StreamObserver was 
terminated.");
+        }
+
         synchronized (lock) {
+          int currentPhase = isReadyNotifier.getPhase();
+          // Phaser is terminated so don't use the outboundObserver. Since 
onError and onCompleted
+          // are synchronized after terminating the phaser if we observe that 
the phaser is not
+          // terminated the onNext calls below are guaranteed to not be called 
on a closed observer.
+          if (currentPhase < 0) return;
           messagesSinceReady = 0;
           outboundObserver.onNext(value);
           return;
         }
       } catch (TimeoutException e) {
         totalSecondsWaited += waitSeconds;
         if (totalSecondsWaited > deadlineSeconds) {
-          LOG.error(
-              "Exceeded timeout waiting for the outboundObserver to become 
ready meaning "
-                  + "that the stream deadline was not respected.");
-          throw new RuntimeException(e);
+          String errorMessage = 
constructStreamCancelledErrorMessage(totalSecondsWaited);
+          LOG.error(errorMessage);
+          throw new StreamObserverCancelledException(errorMessage, e);
         }
-        if (totalSecondsWaited > 30) {
+
+        if (totalSecondsWaited > OUTPUT_CHANNEL_CONSIDERED_STALLED_SECONDS) {
           LOG.info(
               "Output channel stalled for {}s, outbound thread {}.",
               totalSecondsWaited,
               Thread.currentThread().getName());
         }
+
         waitSeconds = waitSeconds * 2;
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        throw new RuntimeException(e);
+        throw new StreamObserverCancelledException(e);
       }
     }
   }
 
   @Override
   public void onError(Throwable t) {
+    isReadyNotifier.forceTermination();
     synchronized (lock) {
+      isClosed = true;

Review Comment:
   ie if (!isClosed) check to all of these



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to