This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f0abb97 [BEAM-7006] Fix some race conditions in sdf splitting.
new 109eb32 Merge pull request #8256 from robertwb/flaky-sdf
f0abb97 is described below
commit f0abb977abd7c08ee1f3792fc0becbb1321d5bb6
Author: Robert Bradshaw <[email protected]>
AuthorDate: Tue Apr 9 14:27:01 2019 +0200
[BEAM-7006] Fix some race conditions in sdf splitting.
* Splitting could be requested before operation started.
* An element could finish while we were splitting.
---
sdks/python/apache_beam/runners/worker/bundle_processor.py | 13 +++++++++++--
sdks/python/apache_beam/runners/worker/operations.py | 6 ++++--
2 files changed, 15 insertions(+), 4 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index feb8eae..3eed25d 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -118,11 +118,14 @@ class DataInputOperation(RunnerIOOperation):
self.counter_factory, self.name_context.step_name, 0,
next(iter(itervalues(consumers))), self.windowed_coder)]
self.splitting_lock = threading.Lock()
+ self.started = False
def start(self):
super(DataInputOperation, self).start()
- self.index = -1
- self.stop = float('inf')
+ with self.splitting_lock:
+ self.index = -1
+ self.stop = float('inf')
+ self.started = True
def process(self, windowed_value):
self.output(windowed_value)
@@ -140,6 +143,8 @@ class DataInputOperation(RunnerIOOperation):
def try_split(self, fraction_of_remainder, total_buffer_size):
with self.splitting_lock:
+ if not self.started:
+ return
if total_buffer_size < self.index + 1:
total_buffer_size = self.index + 1
elif self.stop and total_buffer_size > self.stop:
@@ -178,6 +183,10 @@ class DataInputOperation(RunnerIOOperation):
self.stop = stop_index
return self.stop - 1, None, None, self.stop
+ def finish(self):
+ with self.splitting_lock:
+ self.started = False
+
class _StateBackedIterable(object):
def __init__(self, state_handler, state_key, coder_or_impl):
diff --git a/sdks/python/apache_beam/runners/worker/operations.py
b/sdks/python/apache_beam/runners/worker/operations.py
index 8478866..de47cdd 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -645,8 +645,10 @@ class SdfProcessSizedElements(DoOperation):
def current_element_progress(self):
with self.lock:
if self.element_start_output_bytes is not None:
- return self.dofn_runner.current_element_progress().with_completed(
- self._total_output_bytes() - self.element_start_output_bytes)
+ progress = self.dofn_runner.current_element_progress()
+ if progress is not None:
+ return progress.with_completed(
+ self._total_output_bytes() - self.element_start_output_bytes)
def progress_metrics(self):
with self.lock: