scwhittle commented on code in PR #24853:
URL: https://github.com/apache/beam/pull/24853#discussion_r1063253469


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DirectStreamObserver.java:
##########
@@ -41,80 +41,92 @@
   private static final Logger LOG = 
LoggerFactory.getLogger(DirectStreamObserver.class);
   private final Phaser phaser;
 
-  @GuardedBy("outboundObserver")
+  private final Object lock = new Object();
+
+  @GuardedBy("lock")
   private final CallStreamObserver<T> outboundObserver;
 
   private final long deadlineSeconds;
+  private final int messagesBetweenIsReadyChecks;
 
-  @GuardedBy("outboundObserver")
-  private boolean firstMessage = true;
+  @GuardedBy("lock")
+  private int messagesSinceReady = 0;
 
   public DirectStreamObserver(
-      Phaser phaser, CallStreamObserver<T> outboundObserver, long 
deadlineSeconds) {
+      Phaser phaser,
+      CallStreamObserver<T> outboundObserver,
+      long deadlineSeconds,
+      int messagesBetweenIsReadyChecks) {
     this.phaser = phaser;
     this.outboundObserver = outboundObserver;
     this.deadlineSeconds = deadlineSeconds;
+    // We always let the first message pass through without blocking because 
it is performed under
+    // the StreamPool synchronized block and single header message isn't going 
to cause memory
+    // issues due to excessive buffering within grpc.
+    this.messagesBetweenIsReadyChecks = Math.max(1, 
messagesBetweenIsReadyChecks);
   }
 
   @Override
   public void onNext(T value) {
-    final int phase = phaser.getPhase();
+    int phase = -1;
     long totalSecondsWaited = 0;
     long waitSeconds = 1;
-    while (true) {
-      try {
-        synchronized (outboundObserver) {
-          // We let the first message passthrough without blocking because it 
is performed under the
-          // StreamPool synchronized block and single message isn't going to 
cause memory issues due
-          // to excessive buffering within grpc.
-          if (firstMessage || outboundObserver.isReady()) {
-            firstMessage = false;
-            outboundObserver.onNext(value);
-            return;
+    synchronized (lock) {

Review Comment:
   was thinking it would reduce grabbing/releasing, since other callers will be 
waiting on phaser anyway
   
   But on second thought, that only reduces lock/unlock on timeouts and it 
could have weird interactions wiating on phaser and blocking 
onError/onCompleted.  Changing it back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to