lukecwik commented on code in PR #17358:
URL: https://github.com/apache/beam/pull/17358#discussion_r859380353
##########
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -60,41 +66,47 @@ public DirectStreamObserver(Phaser phaser,
CallStreamObserver<T> outboundObserve
@Override
public void onNext(T value) {
- if (maxMessagesBeforeCheck <= 1
- || numMessages.incrementAndGet() % maxMessagesBeforeCheck == 0) {
- int waitTime = 1;
- int totalTimeWaited = 0;
- int phase = phaser.getPhase();
- while (!outboundObserver.isReady()) {
- try {
- phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- totalTimeWaited += waitTime;
- waitTime = waitTime * 2;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
+ synchronized (outboundObserver) {
+ if (++numMessages >= maxMessagesBeforeCheck) {
+ numMessages = 0;
+ int waitTime = 1;
+ int totalTimeWaited = 0;
+ int phase = phaser.getPhase();
+ // Record the initial phase in case we are in the inbound gRPC thread
where the phase won't
+ // advance.
+ int initialPhase = phase;
+ while (!outboundObserver.isReady()) {
+ try {
+ phaser.awaitAdvanceInterruptibly(phase, waitTime,
TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ totalTimeWaited += waitTime;
+ waitTime = waitTime * 2;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ // There is a chance that we were spuriously woken up but the
outboundObserver is no
Review Comment:
done
##########
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -60,41 +66,47 @@ public DirectStreamObserver(Phaser phaser,
CallStreamObserver<T> outboundObserve
@Override
public void onNext(T value) {
- if (maxMessagesBeforeCheck <= 1
- || numMessages.incrementAndGet() % maxMessagesBeforeCheck == 0) {
- int waitTime = 1;
- int totalTimeWaited = 0;
- int phase = phaser.getPhase();
- while (!outboundObserver.isReady()) {
- try {
- phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- totalTimeWaited += waitTime;
- waitTime = waitTime * 2;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
+ synchronized (outboundObserver) {
+ if (++numMessages >= maxMessagesBeforeCheck) {
+ numMessages = 0;
+ int waitTime = 1;
+ int totalTimeWaited = 0;
+ int phase = phaser.getPhase();
+ // Record the initial phase in case we are in the inbound gRPC thread
where the phase won't
+ // advance.
+ int initialPhase = phase;
+ while (!outboundObserver.isReady()) {
+ try {
+ phaser.awaitAdvanceInterruptibly(phase, waitTime,
TimeUnit.SECONDS);
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]