scwhittle commented on code in PR #17358:
URL: https://github.com/apache/beam/pull/17358#discussion_r859565520


##########
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -60,55 +67,58 @@ public DirectStreamObserver(Phaser phaser, 
CallStreamObserver<T> outboundObserve
 
   @Override
   public void onNext(T value) {
-    if (maxMessagesBeforeCheck <= 1
-        || numMessages.incrementAndGet() % maxMessagesBeforeCheck == 0) {
-      int waitTime = 1;
-      int totalTimeWaited = 0;
-      int phase = phaser.getPhase();
-      while (!outboundObserver.isReady()) {
-        try {
-          phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS);
-        } catch (TimeoutException e) {
-          totalTimeWaited += waitTime;
-          waitTime = waitTime * 2;
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
+    synchronized (lock) {
+      if (++numMessages >= maxMessagesBeforeCheck) {
+        numMessages = 0;
+        int waitTime = 1;
+        int totalTimeWaited = 0;
+        int phase = phaser.getPhase();
+        // Record the initial phase in case we are in the inbound gRPC thread 
where the phase won't
+        // advance.
+        int initialPhase = phase;
+        while (!outboundObserver.isReady()) {
+          try {
+            phase = phaser.awaitAdvanceInterruptibly(phase, waitTime, 
TimeUnit.SECONDS);
+          } catch (TimeoutException e) {
+            totalTimeWaited += waitTime;
+            waitTime = waitTime * 2;
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+          }
         }
-      }
-      if (totalTimeWaited > 0) {
-        // If the phase didn't change, this means that the installed onReady 
callback had not
-        // been invoked.
-        if (phase == phaser.getPhase()) {
-          LOG.info(
-              "Output channel stalled for {}s, outbound thread {}. See: "
-                  + "https://issues.apache.org/jira/browse/BEAM-4280 for the 
history for "
-                  + "this issue.",
-              totalTimeWaited,
-              Thread.currentThread().getName());
-        } else {
-          LOG.debug(
-              "Output channel stalled for {}s, outbound thread {}.",
-              totalTimeWaited,
-              Thread.currentThread().getName());
+        if (totalTimeWaited > 0) {
+          // If the phase didn't change, this means that the installed onReady 
callback had not
+          // been invoked.
+          if (initialPhase == phaser.getPhase()) {

Review Comment:
   I think you could just use phase instead of getPhase() here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to