This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b850458 Fixing data race in DataflowSideInputReadCounter
new 574d56e Merge pull request #8080 from pabloem/fix-sic
b850458 is described below
commit b85045828232f9c31d788d0768e416230e198dc7
Author: pabloem <[email protected]>
AuthorDate: Mon Mar 18 10:16:51 2019 -0700
Fixing data race in DataflowSideInputReadCounter
---
.../beam/runners/dataflow/worker/DataflowSideInputReadCounter.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSideInputReadCounter.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSideInputReadCounter.java
index 639b6c7..8b71029 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSideInputReadCounter.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSideInputReadCounter.java
@@ -110,10 +110,10 @@ public class DataflowSideInputReadCounter implements
SideInputReadCounter {
this.declaringOperationContext = operationContext;
byteCounters = new HashMap<>();
executionStates = new HashMap<>();
- checkState();
+ updateCurrentStateIfOutdated();
}
- private void checkState() {
+ private void updateCurrentStateIfOutdated() {
DataflowExecutionState currentState =
(DataflowExecutionState)
executionContext.getExecutionStateTracker().getCurrentState();
if (currentState == null
@@ -160,11 +160,11 @@ public class DataflowSideInputReadCounter implements
SideInputReadCounter {
@Override
public Closeable enter() {
- checkState();
// Only update status from tracked thread to avoid race condition and
inconsistent state updates
if (executionContext.getExecutionStateTracker().getTrackedThread() !=
Thread.currentThread()) {
return () -> {};
}
+ updateCurrentStateIfOutdated();
return
executionContext.getExecutionStateTracker().enterState(currentExecutionState);
}