scwhittle commented on code in PR #33419:
URL: https://github.com/apache/beam/pull/33419#discussion_r1903937999
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -53,7 +53,7 @@ public final class DirectStreamObserver<T> implements
StreamObserver<T> {
private final int maxMessagesBeforeCheck;
private final Object lock = new Object();
- private int numMessages = -1;
Review Comment:
This seems ok, but I'm not clear why this is less racy?
Why is initializing to 0 different from initializing to -1?
can this be annotated with GuardedBy?
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -69,7 +69,7 @@ public DirectStreamObserver(Phaser phaser,
CallStreamObserver<T> outboundObserve
@Override
public void onNext(T value) {
synchronized (lock) {
- if (++numMessages >= maxMessagesBeforeCheck) {
+ if (++numMessages > maxMessagesBeforeCheck) {
Review Comment:
it seems that with the -1 -> 0 and the >= we still have the same behavior
for the first triggered check. But subsequent triggered checks would take 1
more than previous incorrect behavior.
For example if maxMessagesPerCheck was 1, before we would check on
2nd message
and then every message afterwards. But now we would correctly check every
other message. So if the test fails it seems like the test was verifying the
bad behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]