[
https://issues.apache.org/jira/browse/BEAM-13015?focusedWorklogId=762707&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-762707
]
ASF GitHub Bot logged work on BEAM-13015:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 27/Apr/22 04:55
Start Date: 27/Apr/22 04:55
Worklog Time Spent: 10m
Work Description: lukecwik commented on code in PR #17358:
URL: https://github.com/apache/beam/pull/17358#discussion_r859380353
##########
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -60,41 +66,47 @@ public DirectStreamObserver(Phaser phaser,
CallStreamObserver<T> outboundObserve
@Override
public void onNext(T value) {
- if (maxMessagesBeforeCheck <= 1
- || numMessages.incrementAndGet() % maxMessagesBeforeCheck == 0) {
- int waitTime = 1;
- int totalTimeWaited = 0;
- int phase = phaser.getPhase();
- while (!outboundObserver.isReady()) {
- try {
- phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- totalTimeWaited += waitTime;
- waitTime = waitTime * 2;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
+ synchronized (outboundObserver) {
+ if (++numMessages >= maxMessagesBeforeCheck) {
+ numMessages = 0;
+ int waitTime = 1;
+ int totalTimeWaited = 0;
+ int phase = phaser.getPhase();
+ // Record the initial phase in case we are in the inbound gRPC thread
where the phase won't
+ // advance.
+ int initialPhase = phase;
+ while (!outboundObserver.isReady()) {
+ try {
+ phaser.awaitAdvanceInterruptibly(phase, waitTime,
TimeUnit.SECONDS);
+ } catch (TimeoutException e) {
+ totalTimeWaited += waitTime;
+ waitTime = waitTime * 2;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ // There is a chance that we were spuriously woken up but the
outboundObserver is no
Review Comment:
done
##########
sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/stream/DirectStreamObserver.java:
##########
@@ -60,41 +66,47 @@ public DirectStreamObserver(Phaser phaser,
CallStreamObserver<T> outboundObserve
@Override
public void onNext(T value) {
- if (maxMessagesBeforeCheck <= 1
- || numMessages.incrementAndGet() % maxMessagesBeforeCheck == 0) {
- int waitTime = 1;
- int totalTimeWaited = 0;
- int phase = phaser.getPhase();
- while (!outboundObserver.isReady()) {
- try {
- phaser.awaitAdvanceInterruptibly(phase, waitTime, TimeUnit.SECONDS);
- } catch (TimeoutException e) {
- totalTimeWaited += waitTime;
- waitTime = waitTime * 2;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
+ synchronized (outboundObserver) {
+ if (++numMessages >= maxMessagesBeforeCheck) {
+ numMessages = 0;
+ int waitTime = 1;
+ int totalTimeWaited = 0;
+ int phase = phaser.getPhase();
+ // Record the initial phase in case we are in the inbound gRPC thread
where the phase won't
+ // advance.
+ int initialPhase = phase;
+ while (!outboundObserver.isReady()) {
+ try {
+ phaser.awaitAdvanceInterruptibly(phase, waitTime,
TimeUnit.SECONDS);
Review Comment:
done
Issue Time Tracking
-------------------
Worklog Id: (was: 762707)
Time Spent: 78h 10m (was: 78h)
> Optimize Java SDK harness
> -------------------------
>
> Key: BEAM-13015
> URL: https://issues.apache.org/jira/browse/BEAM-13015
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-harness
> Reporter: Luke Cwik
> Assignee: Luke Cwik
> Priority: P2
> Time Spent: 78h 10m
> Remaining Estimate: 0h
>
> Use profiling tools to remove bundle processing overhead in the SDK harness.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)