This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f0abb97  [BEAM-7006] Fix some race conditions in sdf splitting.
     new 109eb32  Merge pull request #8256 from robertwb/flaky-sdf
f0abb97 is described below

commit f0abb977abd7c08ee1f3792fc0becbb1321d5bb6
Author: Robert Bradshaw <[email protected]>
AuthorDate: Tue Apr 9 14:27:01 2019 +0200

    [BEAM-7006] Fix some race conditions in sdf splitting.
    
    * Splitting could be requested before operation started.
    * An element could finish while we were splitting.
---
 sdks/python/apache_beam/runners/worker/bundle_processor.py | 13 +++++++++++--
 sdks/python/apache_beam/runners/worker/operations.py       |  6 ++++--
 2 files changed, 15 insertions(+), 4 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index feb8eae..3eed25d 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -118,11 +118,14 @@ class DataInputOperation(RunnerIOOperation):
             self.counter_factory, self.name_context.step_name, 0,
             next(iter(itervalues(consumers))), self.windowed_coder)]
     self.splitting_lock = threading.Lock()
+    self.started = False
 
   def start(self):
     super(DataInputOperation, self).start()
-    self.index = -1
-    self.stop = float('inf')
+    with self.splitting_lock:
+      self.index = -1
+      self.stop = float('inf')
+      self.started = True
 
   def process(self, windowed_value):
     self.output(windowed_value)
@@ -140,6 +143,8 @@ class DataInputOperation(RunnerIOOperation):
 
   def try_split(self, fraction_of_remainder, total_buffer_size):
     with self.splitting_lock:
+      if not self.started:
+        return
       if total_buffer_size < self.index + 1:
         total_buffer_size = self.index + 1
       elif self.stop and total_buffer_size > self.stop:
@@ -178,6 +183,10 @@ class DataInputOperation(RunnerIOOperation):
         self.stop = stop_index
         return self.stop - 1, None, None, self.stop
 
+  def finish(self):
+    with self.splitting_lock:
+      self.started = False
+
 
 class _StateBackedIterable(object):
   def __init__(self, state_handler, state_key, coder_or_impl):
diff --git a/sdks/python/apache_beam/runners/worker/operations.py 
b/sdks/python/apache_beam/runners/worker/operations.py
index 8478866..de47cdd 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -645,8 +645,10 @@ class SdfProcessSizedElements(DoOperation):
   def current_element_progress(self):
     with self.lock:
       if self.element_start_output_bytes is not None:
-        return self.dofn_runner.current_element_progress().with_completed(
-            self._total_output_bytes() - self.element_start_output_bytes)
+        progress = self.dofn_runner.current_element_progress()
+        if progress is not None:
+          return progress.with_completed(
+              self._total_output_bytes() - self.element_start_output_bytes)
 
   def progress_metrics(self):
     with self.lock:

Reply via email to