[ 
https://issues.apache.org/jira/browse/BEAM-13015?focusedWorklogId=762707&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762707
 ]

ASF GitHub Bot logged work on BEAM-13015:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Apr/22 04:55
            Start Date: 27/Apr/22 04:55
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on code in PR #17358:
URL: https://github.com/apache/beam/pull/17358#discussion_r859380353


##########
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -60,41 +66,47 @@ public DirectStreamObserver(Phaser phaser, 
CallStreamObserver<T> outboundObserve
 
   @Override
   public void onNext(T value) {
-    if (maxMessagesBeforeCheck <= 1
-        || numMessages.incrementAndGet() % maxMessagesBeforeCheck == 0) {
-      int waitTime = 1;
-      int totalTimeWaited = 0;
-      int phase = phaser.getPhase();
-      while (!outboundObserver.isReady()) {
-        try {
-          phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS);
-        } catch (TimeoutException e) {
-          totalTimeWaited += waitTime;
-          waitTime = waitTime * 2;
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
+    synchronized (outboundObserver) {
+      if (++numMessages >= maxMessagesBeforeCheck) {
+        numMessages = 0;
+        int waitTime = 1;
+        int totalTimeWaited = 0;
+        int phase = phaser.getPhase();
+        // Record the initial phase in case we are in the inbound gRPC thread 
where the phase won't
+        // advance.
+        int initialPhase = phase;
+        while (!outboundObserver.isReady()) {
+          try {
+            phaser.awaitAdvanceInterruptibly(phase, waitTime, 
TimeUnit.SECONDS);
+          } catch (TimeoutException e) {
+            totalTimeWaited += waitTime;
+            waitTime = waitTime * 2;
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+          }
+          // There is a chance that we were spuriously woken up but the 
outboundObserver is no

Review Comment:
   done



##########
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -60,41 +66,47 @@ public DirectStreamObserver(Phaser phaser, 
CallStreamObserver<T> outboundObserve
 
   @Override
   public void onNext(T value) {
-    if (maxMessagesBeforeCheck <= 1
-        || numMessages.incrementAndGet() % maxMessagesBeforeCheck == 0) {
-      int waitTime = 1;
-      int totalTimeWaited = 0;
-      int phase = phaser.getPhase();
-      while (!outboundObserver.isReady()) {
-        try {
-          phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS);
-        } catch (TimeoutException e) {
-          totalTimeWaited += waitTime;
-          waitTime = waitTime * 2;
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
+    synchronized (outboundObserver) {
+      if (++numMessages >= maxMessagesBeforeCheck) {
+        numMessages = 0;
+        int waitTime = 1;
+        int totalTimeWaited = 0;
+        int phase = phaser.getPhase();
+        // Record the initial phase in case we are in the inbound gRPC thread 
where the phase won't
+        // advance.
+        int initialPhase = phase;
+        while (!outboundObserver.isReady()) {
+          try {
+            phaser.awaitAdvanceInterruptibly(phase, waitTime, 
TimeUnit.SECONDS);

Review Comment:
   done





Issue Time Tracking
-------------------

    Worklog Id:     (was: 762707)
    Time Spent: 78h 10m  (was: 78h)

> Optimize Java SDK harness
> -------------------------
>
>                 Key: BEAM-13015
>                 URL: https://issues.apache.org/jira/browse/BEAM-13015
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-harness
>            Reporter: Luke Cwik
>            Assignee: Luke Cwik
>            Priority: P2
>          Time Spent: 78h 10m
>  Remaining Estimate: 0h
>
> Use profiling tools to remove bundle processing overhead in the SDK harness.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to