[ 
https://issues.apache.org/jira/browse/BEAM-13015?focusedWorklogId=759990&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-759990
 ]

ASF GitHub Bot logged work on BEAM-13015:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Apr/22 12:43
            Start Date: 21/Apr/22 12:43
    Worklog Time Spent: 10m 
      Work Description: scwhittle commented on code in PR #17358:
URL: https://github.com/apache/beam/pull/17358#discussion_r855137624


##########
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -60,41 +66,47 @@ public DirectStreamObserver(Phaser phaser, 
CallStreamObserver<T> outboundObserve
 
   @Override
   public void onNext(T value) {
-    if (maxMessagesBeforeCheck <= 1
-        || numMessages.incrementAndGet() % maxMessagesBeforeCheck == 0) {
-      int waitTime = 1;
-      int totalTimeWaited = 0;
-      int phase = phaser.getPhase();
-      while (!outboundObserver.isReady()) {
-        try {
-          phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS);
-        } catch (TimeoutException e) {
-          totalTimeWaited += waitTime;
-          waitTime = waitTime * 2;
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
+    synchronized (outboundObserver) {
+      if (++numMessages >= maxMessagesBeforeCheck) {
+        numMessages = 0;
+        int waitTime = 1;
+        int totalTimeWaited = 0;
+        int phase = phaser.getPhase();
+        // Record the initial phase in case we are in the inbound gRPC thread 
where the phase won't
+        // advance.
+        int initialPhase = phase;
+        while (!outboundObserver.isReady()) {
+          try {
+            phaser.awaitAdvanceInterruptibly(phase, waitTime, 
TimeUnit.SECONDS);
+          } catch (TimeoutException e) {
+            totalTimeWaited += waitTime;
+            waitTime = waitTime * 2;
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+          }
+          // There is a chance that we were spuriously woken up but the 
outboundObserver is no

Review Comment:
   I think await returns the current phase, can you use that instead?
   
   nit: the wakeup doesn't have to be spurious, it could have been ready and 
thus notified but it is just no longer ready for some unrelated reason
   
   



##########
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -60,41 +66,47 @@ public DirectStreamObserver(Phaser phaser, 
CallStreamObserver<T> outboundObserve
 
   @Override
   public void onNext(T value) {
-    if (maxMessagesBeforeCheck <= 1
-        || numMessages.incrementAndGet() % maxMessagesBeforeCheck == 0) {
-      int waitTime = 1;
-      int totalTimeWaited = 0;
-      int phase = phaser.getPhase();
-      while (!outboundObserver.isReady()) {
-        try {
-          phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS);
-        } catch (TimeoutException e) {
-          totalTimeWaited += waitTime;
-          waitTime = waitTime * 2;
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
+    synchronized (outboundObserver) {
+      if (++numMessages >= maxMessagesBeforeCheck) {
+        numMessages = 0;
+        int waitTime = 1;
+        int totalTimeWaited = 0;
+        int phase = phaser.getPhase();
+        // Record the initial phase in case we are in the inbound gRPC thread 
where the phase won't
+        // advance.
+        int initialPhase = phase;
+        while (!outboundObserver.isReady()) {
+          try {
+            phaser.awaitAdvanceInterruptibly(phase, waitTime, 
TimeUnit.SECONDS);

Review Comment:
   One possible difference is we are blocking waiting for isReady to transition 
while synchronizing on the outbound observer.
   Would it be safer to synchronize on something internal to the 
DirectStreamObserver instead in case the outbound observer is synchronized upon 
elsewhere? For example what if the outbound observer (sometimes?) synchronizes 
on itself to transition from isReady=false to isReady=true but not for onNext? 
Or does grpc document syncronization for these observers and thus it doesn't 
matter?
   
   If you don't think that is a concern for some reason, LGTM to me.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 759990)
    Time Spent: 77h  (was: 76h 50m)

> Optimize Java SDK harness
> -------------------------
>
>                 Key: BEAM-13015
>                 URL: https://issues.apache.org/jira/browse/BEAM-13015
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-harness
>            Reporter: Luke Cwik
>            Assignee: Luke Cwik
>            Priority: P2
>          Time Spent: 77h
>  Remaining Estimate: 0h
>
> Use profiling tools to remove bundle processing overhead in the SDK harness.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to