This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e8bda0e Fix data races in BCJ and RecordingManager
new 5d81e84 Merge pull request #12797 from [BEAM-10603] Fix data races in
BCJ and RecordingManager
e8bda0e is described below
commit e8bda0ef9c7a99cdc600e3f280d5f80b5328e8de
Author: Sam Rohde <[email protected]>
AuthorDate: Wed Sep 9 10:47:31 2020 -0700
Fix data races in BCJ and RecordingManager
There were some data races regarding the non-threadsafe PipelineResult
in the BackgroundCachingJob and the RecordingManager. This adds locks
around PipelineResults.
Change-Id: I612171dc6ef9225335cfeb1bfec9871986e43c47
---
.../runners/interactive/background_caching_job.py | 44 +++++++++++++++-------
.../runners/interactive/recording_manager.py | 21 +++++++----
2 files changed, 45 insertions(+), 20 deletions(-)
diff --git
a/sdks/python/apache_beam/runners/interactive/background_caching_job.py
b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
index ec1c3c4..0e368fb 100644
--- a/sdks/python/apache_beam/runners/interactive/background_caching_job.py
+++ b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
@@ -68,6 +68,7 @@ class BackgroundCachingJob(object):
"""
def __init__(self, pipeline_result, limiters):
self._pipeline_result = pipeline_result
+ self._result_lock = threading.RLock()
self._condition_checker = threading.Thread(
target=self._background_caching_job_condition_checker, daemon=True)
@@ -77,7 +78,11 @@ class BackgroundCachingJob(object):
self._condition_checker.start()
def _background_caching_job_condition_checker(self):
- while not PipelineState.is_terminal(self._pipeline_result.state):
+ while True:
+ with self._result_lock:
+ if PipelineState.is_terminal(self._pipeline_result.state):
+ break
+
if self._should_end_condition_checker():
self.cancel()
break
@@ -87,30 +92,41 @@ class BackgroundCachingJob(object):
return any([l.is_triggered() for l in self._limiters])
def is_done(self):
- is_terminated = self._pipeline_result.state in (
- PipelineState.DONE, PipelineState.CANCELLED)
- is_triggered = self._should_end_condition_checker()
- is_cancelling = self._pipeline_result.state is PipelineState.CANCELLING
+ with self._result_lock:
+ is_terminated = self._pipeline_result.state in (
+ PipelineState.DONE, PipelineState.CANCELLED)
+ is_triggered = self._should_end_condition_checker()
+ is_cancelling = self._pipeline_result.state is PipelineState.CANCELLING
return is_terminated or (is_triggered and is_cancelling)
def is_running(self):
- return self._pipeline_result.state is PipelineState.RUNNING
+ with self._result_lock:
+ return self._pipeline_result.state is PipelineState.RUNNING
def cancel(self):
"""Cancels this background caching job.
"""
- if not PipelineState.is_terminal(self._pipeline_result.state):
- try:
- self._pipeline_result.cancel()
- self._pipeline_result.wait_until_finish()
- except NotImplementedError:
- # Ignore the cancel invocation if it is never implemented by the
runner.
- pass
+ with self._result_lock:
+ if not PipelineState.is_terminal(self._pipeline_result.state):
+ try:
+ self._pipeline_result.cancel()
+ # self._pipeline_result.wait_until_finish()
+ except NotImplementedError:
+ # Ignore the cancel invocation if it is never implemented by the
+ # runner.
+ pass
+
+ @property
+ def state(self):
+ with self._result_lock:
+ return self._pipeline_result.state
def attempt_to_run_background_caching_job(runner, user_pipeline, options=None):
"""Attempts to run a background caching job for a user-defined pipeline.
+ Returns True if a job was started, False otherwise.
+
The pipeline result is automatically tracked by Interactive Beam in case
future cancellation/cleanup is needed.
"""
@@ -134,6 +150,8 @@ def attempt_to_run_background_caching_job(runner,
user_pipeline, options=None):
ie.current_env().set_background_caching_job(
user_pipeline,
BackgroundCachingJob(background_caching_job_result, limiters=limiters))
+ return True
+ return False
def is_background_caching_job_needed(user_pipeline):
diff --git a/sdks/python/apache_beam/runners/interactive/recording_manager.py
b/sdks/python/apache_beam/runners/interactive/recording_manager.py
index a5ce082..f67eb46 100644
--- a/sdks/python/apache_beam/runners/interactive/recording_manager.py
+++ b/sdks/python/apache_beam/runners/interactive/recording_manager.py
@@ -149,6 +149,7 @@ class Recording:
self._user_pipeline = user_pipeline
self._result = result
+ self._result_lock = threading.Lock()
self._pcolls = pcolls
pcoll_var = lambda pcoll: pipeline_instrument.cacheable_var_by_pcoll_id(
@@ -182,13 +183,18 @@ class Recording:
return
while not PipelineState.is_terminal(self._result.state):
- if time.time() - self._start >= self._duration_secs:
- self._result.cancel()
- self._result.wait_until_finish()
+ with self._result_lock:
+ bcj = ie.current_env().get_background_caching_job(self._user_pipeline)
+ if bcj and bcj.is_done():
+ self._result.wait_until_finish()
- elif all(s.is_done() for s in self._streams.values()):
- self._result.cancel()
- self._result.wait_until_finish()
+ elif time.time() - self._start >= self._duration_secs:
+ self._result.cancel()
+ self._result.wait_until_finish()
+
+ elif all(s.is_done() for s in self._streams.values()):
+ self._result.cancel()
+ self._result.wait_until_finish()
time.sleep(0.1)
@@ -225,7 +231,8 @@ class Recording:
# type: () -> None
"""Cancels the recording."""
- self._result.cancel()
+ with self._result_lock:
+ self._result.cancel()
def wait_until_finish(self):
# type: () -> None