scwhittle commented on code in PR #33419:
URL: https://github.com/apache/beam/pull/33419#discussion_r1903937999


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -53,7 +53,7 @@ public final class DirectStreamObserver<T> implements 
StreamObserver<T> {
   private final int maxMessagesBeforeCheck;
 
   private final Object lock = new Object();
-  private int numMessages = -1;

Review Comment:
   This seems ok, but I'm not clear why this is less racy? 
   Why is initializing to 0 different from initializing to -1?
   
   can this be annotated with GuardedBy?



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -69,7 +69,7 @@ public DirectStreamObserver(Phaser phaser, 
CallStreamObserver<T> outboundObserve
   @Override
   public void onNext(T value) {
     synchronized (lock) {
-      if (++numMessages >= maxMessagesBeforeCheck) {
+      if (++numMessages > maxMessagesBeforeCheck) {

Review Comment:
   it seems that with the -1 -> 0 and the >= we still have the same behavior 
for the first triggered check.  But subsequent triggered checks would take 1 
more than previous incorrect behavior.
   
   For example if maxMessagesPerCheck was 1, before we would check on
   2nd message
   and then every message afterwards.  But now we would correctly check every 
other message. So if the test fails it seems like the test was verifying the 
bad behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to