scwhittle commented on code in PR #17358:
URL: https://github.com/apache/beam/pull/17358#discussion_r859565520
##########
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -60,55 +67,58 @@ public DirectStreamObserver(Phaser phaser,
CallStreamObserver<T> outboundObserve
@Override
public void onNext(T value) {
- if (maxMessagesBeforeCheck <= 1
- || numMessages.incrementAndGet() % maxMessagesBeforeCheck == 0) {
- int waitTime = 1;
- int totalTimeWaited = 0;
- int phase = phaser.getPhase();
- while (!outboundObserver.isReady()) {
- try {
- phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- totalTimeWaited += waitTime;
- waitTime = waitTime * 2;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
+ synchronized (lock) {
+ if (++numMessages >= maxMessagesBeforeCheck) {
+ numMessages = 0;
+ int waitTime = 1;
+ int totalTimeWaited = 0;
+ int phase = phaser.getPhase();
+ // Record the initial phase in case we are in the inbound gRPC thread
where the phase won't
+ // advance.
+ int initialPhase = phase;
+ while (!outboundObserver.isReady()) {
+ try {
+ phase = phaser.awaitAdvanceInterruptibly(phase, waitTime,
TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ totalTimeWaited += waitTime;
+ waitTime = waitTime * 2;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
}
- }
- if (totalTimeWaited > 0) {
- // If the phase didn't change, this means that the installed onReady
callback had not
- // been invoked.
- if (phase == phaser.getPhase()) {
- LOG.info(
- "Output channel stalled for {}s, outbound thread {}. See: "
- + "https://issues.apache.org/jira/browse/BEAM-4280 for the
history for "
- + "this issue.",
- totalTimeWaited,
- Thread.currentThread().getName());
- } else {
- LOG.debug(
- "Output channel stalled for {}s, outbound thread {}.",
- totalTimeWaited,
- Thread.currentThread().getName());
+ if (totalTimeWaited > 0) {
+ // If the phase didn't change, this means that the installed onReady
callback had not
+ // been invoked.
+ if (initialPhase == phaser.getPhase()) {
Review Comment:
I think you could just use phase instead of getPhase() here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]