pabloem commented on a change in pull request #11389: Refactor the BCJ and
capture controls to be more testable
URL: https://github.com/apache/beam/pull/11389#discussion_r407693399
##########
File path:
sdks/python/apache_beam/runners/interactive/background_caching_job.py
##########
@@ -66,61 +66,40 @@ class BackgroundCachingJob(object):
In both situations, the background caching job should be treated as done
successfully.
"""
- def __init__(self, pipeline_result, start_limit_checkers=True):
+ def __init__(self, pipeline_result, limiters):
self._pipeline_result = pipeline_result
- self._timer = threading.Timer(
- ie.current_env().options.capture_duration.total_seconds(),
self._cancel)
- self._timer.daemon = True
self._condition_checker = threading.Thread(
target=self._background_caching_job_condition_checker, daemon=True)
- if start_limit_checkers:
- self._timer.start()
- self._condition_checker.start()
- self._timer_triggered = False
- self._condition_checker_triggered = False
+
+ # Limiters are checks s.t. if any are triggered then the background caching
+ # job gets cancelled.
+ self._limiters = limiters
+ self._condition_checker.start()
def _background_caching_job_condition_checker(self):
while not PipelineState.is_terminal(self._pipeline_result.state):
if self._should_end_condition_checker():
+ self.cancel()
break
- time.sleep(5)
+ time.sleep(0.5)
Review comment:
maybe the wait time should be parameterizable? Is it pretty cheap to check
the limiters? Up to you.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services