Repository: beam Updated Branches: refs/heads/master 4f0146a7e -> d4ce94f4e
Clean up DirectRunner Clock and TransformResult Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4d98d43e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4d98d43e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4d98d43e Branch: refs/heads/master Commit: 4d98d43e5f3bb7b112986a7e3807a3fc2812b641 Parents: 4f0146a Author: Charles Chen <[email protected]> Authored: Tue Apr 18 15:18:40 2017 +0800 Committer: Ahmet Altay <[email protected]> Committed: Tue Apr 18 09:32:00 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/direct/clock.py | 9 ++-- .../runners/direct/evaluation_context.py | 11 ++--- .../apache_beam/runners/direct/executor.py | 4 +- .../runners/direct/transform_result.py | 45 +++++--------------- .../runners/direct/watermark_manager.py | 4 +- 5 files changed, 23 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4d98d43e/sdks/python/apache_beam/runners/direct/clock.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/clock.py b/sdks/python/apache_beam/runners/direct/clock.py index 11e49cd..dd1800a 100644 --- a/sdks/python/apache_beam/runners/direct/clock.py +++ b/sdks/python/apache_beam/runners/direct/clock.py @@ -24,8 +24,7 @@ import time class Clock(object): - @property - def now(self): + def time(self): """Returns the number of milliseconds since epoch.""" return int(time.time() * 1000) @@ -36,12 +35,10 @@ class MockClock(Clock): def __init__(self, now_in_ms): self._now_in_ms = now_in_ms - @property - def now(self): + def time(self): return self._now_in_ms - @now.setter - def now(self, value_in_ms): + def set_time(self, value_in_ms): assert value_in_ms >= self._now_in_ms self._now_in_ms = value_in_ms http://git-wip-us.apache.org/repos/asf/beam/blob/4d98d43e/sdks/python/apache_beam/runners/direct/evaluation_context.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py index 2169c7c..68d99d3 100644 --- a/sdks/python/apache_beam/runners/direct/evaluation_context.py +++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py @@ -199,20 +199,21 @@ class EvaluationContext(object): the committed bundles contained within the handled result. """ with self._lock: - committed_bundles = self._commit_bundles(result.output_bundles) + committed_bundles = self._commit_bundles( + result.uncommitted_output_bundles) self._watermark_manager.update_watermarks( completed_bundle, result.transform, completed_timers, committed_bundles, result.watermark_hold) self._metrics.commit_logical(completed_bundle, - result.logical_metric_updates()) + result.logical_metric_updates) # If the result is for a view, update side inputs container. - if (result.output_bundles - and result.output_bundles[0].pcollection + if (result.uncommitted_output_bundles + and result.uncommitted_output_bundles[0].pcollection in self._pcollection_to_views): for view in self._pcollection_to_views[ - result.output_bundles[0].pcollection]: + result.uncommitted_output_bundles[0].pcollection]: for committed_bundle in committed_bundles: # side_input must be materialized. self._side_inputs_container.add_values( http://git-wip-us.apache.org/repos/asf/beam/blob/4d98d43e/sdks/python/apache_beam/runners/direct/executor.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index f6a1d7f..da06158 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -292,10 +292,10 @@ class TransformExecutor(ExecutorService.CallableTask): with scoped_metrics_container: result = evaluator.finish_bundle() - result.metric_updates = metrics_container.get_cumulative() + result.logical_metric_updates = metrics_container.get_cumulative() if self._evaluation_context.has_cache: - for uncommitted_bundle in result.output_bundles: + for uncommitted_bundle in result.uncommitted_output_bundles: self._evaluation_context.append_to_cache( self._applied_transform, uncommitted_bundle.tag, uncommitted_bundle.get_elements_iterable()) http://git-wip-us.apache.org/repos/asf/beam/blob/4d98d43e/sdks/python/apache_beam/runners/direct/transform_result.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_result.py b/sdks/python/apache_beam/runners/direct/transform_result.py index 59597dc..8ae0aea 100644 --- a/sdks/python/apache_beam/runners/direct/transform_result.py +++ b/sdks/python/apache_beam/runners/direct/transform_result.py @@ -26,39 +26,14 @@ class TransformResult(object): def __init__(self, applied_ptransform, uncommitted_output_bundles, state, timer_update, counters, watermark_hold, undeclared_tag_values=None): - self._applied_ptransform = applied_ptransform - self._uncommitted_output_bundles = uncommitted_output_bundles - self._state = state - self._timer_update = timer_update - self._counters = counters - self._watermark_hold = watermark_hold + self.transform = applied_ptransform + self.uncommitted_output_bundles = uncommitted_output_bundles + self.state = state + # TODO: timer update is currently unused. + self.timer_update = timer_update + self.counters = counters + self.watermark_hold = watermark_hold # Only used when caching (materializing) all values is requested. - self._undeclared_tag_values = undeclared_tag_values - self.metric_updates = None - - def logical_metric_updates(self): - return self.metric_updates - - @property - def transform(self): - return self._applied_ptransform - - @property - def output_bundles(self): - return self._uncommitted_output_bundles - - @property - def state(self): - return self._state - - @property - def counters(self): - return self._counters - - @property - def watermark_hold(self): - return self._watermark_hold - - @property - def undeclared_tag_values(self): - return self._undeclared_tag_values + self.undeclared_tag_values = undeclared_tag_values + # Populated by the TransformExecutor. + self.logical_metric_updates = None http://git-wip-us.apache.org/repos/asf/beam/blob/4d98d43e/sdks/python/apache_beam/runners/direct/watermark_manager.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py index 1f18ef6..19d9085 100644 --- a/sdks/python/apache_beam/runners/direct/watermark_manager.py +++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py @@ -210,12 +210,12 @@ class TransformWatermarks(object): @property def synchronized_processing_output_time(self): - return self._clock.now + return self._clock.time() def extract_fired_timers(self): with self._lock: if self._fired_timers: - return False + return False should_fire = ( self._earliest_hold < WatermarkManager.WATERMARK_POS_INF and
