[ 
https://issues.apache.org/jira/browse/BEAM-13015?focusedWorklogId=762761&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762761
 ]

ASF GitHub Bot logged work on BEAM-13015:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Apr/22 09:11
            Start Date: 27/Apr/22 09:11
    Worklog Time Spent: 10m 
      Work Description: scwhittle commented on code in PR #17358:
URL: https://github.com/apache/beam/pull/17358#discussion_r859565520


##########
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -60,55 +67,58 @@ public DirectStreamObserver(Phaser phaser, 
CallStreamObserver<T> outboundObserve
 
   @Override
   public void onNext(T value) {
-    if (maxMessagesBeforeCheck <= 1
-        || numMessages.incrementAndGet() % maxMessagesBeforeCheck == 0) {
-      int waitTime = 1;
-      int totalTimeWaited = 0;
-      int phase = phaser.getPhase();
-      while (!outboundObserver.isReady()) {
-        try {
-          phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS);
-        } catch (TimeoutException e) {
-          totalTimeWaited += waitTime;
-          waitTime = waitTime * 2;
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
+    synchronized (lock) {
+      if (++numMessages >= maxMessagesBeforeCheck) {
+        numMessages = 0;
+        int waitTime = 1;
+        int totalTimeWaited = 0;
+        int phase = phaser.getPhase();
+        // Record the initial phase in case we are in the inbound gRPC thread 
where the phase won't
+        // advance.
+        int initialPhase = phase;
+        while (!outboundObserver.isReady()) {
+          try {
+            phase = phaser.awaitAdvanceInterruptibly(phase, waitTime, 
TimeUnit.SECONDS);
+          } catch (TimeoutException e) {
+            totalTimeWaited += waitTime;
+            waitTime = waitTime * 2;
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+          }
         }
-      }
-      if (totalTimeWaited > 0) {
-        // If the phase didn't change, this means that the installed onReady 
callback had not
-        // been invoked.
-        if (phase == phaser.getPhase()) {
-          LOG.info(
-              "Output channel stalled for {}s, outbound thread {}. See: "
-                  + "https://issues.apache.org/jira/browse/BEAM-4280 for the 
history for "
-                  + "this issue.",
-              totalTimeWaited,
-              Thread.currentThread().getName());
-        } else {
-          LOG.debug(
-              "Output channel stalled for {}s, outbound thread {}.",
-              totalTimeWaited,
-              Thread.currentThread().getName());
+        if (totalTimeWaited > 0) {
+          // If the phase didn't change, this means that the installed onReady 
callback had not
+          // been invoked.
+          if (initialPhase == phaser.getPhase()) {

Review Comment:
   I think you could just use phase instead of getPhase() here





Issue Time Tracking
-------------------

    Worklog Id:     (was: 762761)
    Time Spent: 78.5h  (was: 78h 20m)

> Optimize Java SDK harness
> -------------------------
>
>                 Key: BEAM-13015
>                 URL: https://issues.apache.org/jira/browse/BEAM-13015
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-harness
>            Reporter: Luke Cwik
>            Assignee: Luke Cwik
>            Priority: P2
>          Time Spent: 78.5h
>  Remaining Estimate: 0h
>
> Use profiling tools to remove bundle processing overhead in the SDK harness.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to