This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b850458  Fixing data race in DataflowSideInputReadCounter
     new 574d56e  Merge pull request #8080 from pabloem/fix-sic
b850458 is described below

commit b85045828232f9c31d788d0768e416230e198dc7
Author: pabloem <[email protected]>
AuthorDate: Mon Mar 18 10:16:51 2019 -0700

    Fixing data race in DataflowSideInputReadCounter
---
 .../beam/runners/dataflow/worker/DataflowSideInputReadCounter.java  | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSideInputReadCounter.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSideInputReadCounter.java
index 639b6c7..8b71029 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSideInputReadCounter.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowSideInputReadCounter.java
@@ -110,10 +110,10 @@ public class DataflowSideInputReadCounter implements 
SideInputReadCounter {
     this.declaringOperationContext = operationContext;
     byteCounters = new HashMap<>();
     executionStates = new HashMap<>();
-    checkState();
+    updateCurrentStateIfOutdated();
   }
 
-  private void checkState() {
+  private void updateCurrentStateIfOutdated() {
     DataflowExecutionState currentState =
         (DataflowExecutionState) 
executionContext.getExecutionStateTracker().getCurrentState();
     if (currentState == null
@@ -160,11 +160,11 @@ public class DataflowSideInputReadCounter implements 
SideInputReadCounter {
 
   @Override
   public Closeable enter() {
-    checkState();
     // Only update status from tracked thread to avoid race condition and 
inconsistent state updates
     if (executionContext.getExecutionStateTracker().getTrackedThread() != 
Thread.currentThread()) {
       return () -> {};
     }
+    updateCurrentStateIfOutdated();
     return 
executionContext.getExecutionStateTracker().enterState(currentExecutionState);
   }
 

Reply via email to