[
https://issues.apache.org/jira/browse/BEAM-13015?focusedWorklogId=762761&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762761
]
ASF GitHub Bot logged work on BEAM-13015:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 27/Apr/22 09:11
Start Date: 27/Apr/22 09:11
Worklog Time Spent: 10m
Work Description: scwhittle commented on code in PR #17358:
URL: https://github.com/apache/beam/pull/17358#discussion_r859565520
##########
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -60,55 +67,58 @@ public DirectStreamObserver(Phaser phaser,
CallStreamObserver<T> outboundObserve
@Override
public void onNext(T value) {
- if (maxMessagesBeforeCheck <= 1
- || numMessages.incrementAndGet() % maxMessagesBeforeCheck == 0) {
- int waitTime = 1;
- int totalTimeWaited = 0;
- int phase = phaser.getPhase();
- while (!outboundObserver.isReady()) {
- try {
- phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- totalTimeWaited += waitTime;
- waitTime = waitTime * 2;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
+ synchronized (lock) {
+ if (++numMessages >= maxMessagesBeforeCheck) {
+ numMessages = 0;
+ int waitTime = 1;
+ int totalTimeWaited = 0;
+ int phase = phaser.getPhase();
+ // Record the initial phase in case we are in the inbound gRPC thread
where the phase won't
+ // advance.
+ int initialPhase = phase;
+ while (!outboundObserver.isReady()) {
+ try {
+ phase = phaser.awaitAdvanceInterruptibly(phase, waitTime,
TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ totalTimeWaited += waitTime;
+ waitTime = waitTime * 2;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
}
- }
- if (totalTimeWaited > 0) {
- // If the phase didn't change, this means that the installed onReady
callback had not
- // been invoked.
- if (phase == phaser.getPhase()) {
- LOG.info(
- "Output channel stalled for {}s, outbound thread {}. See: "
- + "https://issues.apache.org/jira/browse/BEAM-4280 for the
history for "
- + "this issue.",
- totalTimeWaited,
- Thread.currentThread().getName());
- } else {
- LOG.debug(
- "Output channel stalled for {}s, outbound thread {}.",
- totalTimeWaited,
- Thread.currentThread().getName());
+ if (totalTimeWaited > 0) {
+ // If the phase didn't change, this means that the installed onReady
callback had not
+ // been invoked.
+ if (initialPhase == phaser.getPhase()) {
Review Comment:
I think you could just use phase instead of getPhase() here
Issue Time Tracking
-------------------
Worklog Id: (was: 762761)
Time Spent: 78.5h (was: 78h 20m)
> Optimize Java SDK harness
> -------------------------
>
> Key: BEAM-13015
> URL: https://issues.apache.org/jira/browse/BEAM-13015
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-harness
> Reporter: Luke Cwik
> Assignee: Luke Cwik
> Priority: P2
> Time Spent: 78.5h
> Remaining Estimate: 0h
>
> Use profiling tools to remove bundle processing overhead in the SDK harness.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)