Repository: beam Updated Branches: refs/heads/master 50acc6c20 -> d1d78121b
Use state / timer API for DirectRunner timer firings Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/56041b78 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/56041b78 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/56041b78 Branch: refs/heads/master Commit: 56041b7850abfbb10d4a6ff2ddecb227a0a4e7c8 Parents: 50acc6c Author: Charles Chen <[email protected]> Authored: Tue Jun 20 15:22:58 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Wed Jun 21 09:23:13 2017 -0700 ---------------------------------------------------------------------- .../runners/direct/evaluation_context.py | 3 +- .../apache_beam/runners/direct/executor.py | 37 ++++++++----- .../runners/direct/transform_evaluator.py | 48 +++++++++++++--- .../runners/direct/transform_result.py | 40 -------------- sdks/python/apache_beam/runners/direct/util.py | 58 ++++++++++++++++++++ .../runners/direct/watermark_manager.py | 56 +++++++++++-------- sdks/python/apache_beam/transforms/trigger.py | 10 +++- 7 files changed, 163 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/runners/direct/evaluation_context.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py index 8fa8e06..976e9e8 100644 --- a/sdks/python/apache_beam/runners/direct/evaluation_context.py +++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py @@ -148,7 +148,8 @@ class EvaluationContext(object): self._transform_keyed_states = self._initialize_keyed_states( root_transforms, value_to_consumers) self._watermark_manager = WatermarkManager( - Clock(), root_transforms, value_to_consumers) + Clock(), root_transforms, value_to_consumers, + self._transform_keyed_states) self._side_inputs_container = _SideInputsContainer(views) self._pending_unblocked_tasks = [] self._counter_factory = counters.CounterFactory() http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/runners/direct/executor.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index eff2d3c..a0a3886 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -222,14 +222,14 @@ class _CompletionCallback(object): or for a source transform. """ - def __init__(self, evaluation_context, all_updates, timers=None): + def __init__(self, evaluation_context, all_updates, timer_firings=None): self._evaluation_context = evaluation_context self._all_updates = all_updates - self._timers = timers + self._timer_firings = timer_firings or [] def handle_result(self, input_committed_bundle, transform_result): output_committed_bundles = self._evaluation_context.handle_result( - input_committed_bundle, self._timers, transform_result) + input_committed_bundle, self._timer_firings, transform_result) for output_committed_bundle in output_committed_bundles: self._all_updates.offer(_ExecutorServiceParallelExecutor._ExecutorUpdate( output_committed_bundle, None)) @@ -251,11 +251,12 @@ class TransformExecutor(_ExecutorService.CallableTask): """ def __init__(self, transform_evaluator_registry, evaluation_context, - input_bundle, applied_ptransform, completion_callback, - transform_evaluation_state): + input_bundle, fired_timers, applied_ptransform, + completion_callback, transform_evaluation_state): self._transform_evaluator_registry = transform_evaluator_registry self._evaluation_context = evaluation_context self._input_bundle = input_bundle + self._fired_timers = fired_timers self._applied_ptransform = applied_ptransform self._completion_callback = completion_callback self._transform_evaluation_state = transform_evaluation_state @@ -288,6 +289,10 @@ class TransformExecutor(_ExecutorService.CallableTask): self._applied_ptransform, self._input_bundle, side_input_values, scoped_metrics_container) + if self._fired_timers: + for timer_firing in self._fired_timers: + evaluator.process_timer_wrapper(timer_firing) + if self._input_bundle: for value in self._input_bundle.get_elements_iterable(): evaluator.process_element(value) @@ -379,11 +384,11 @@ class _ExecutorServiceParallelExecutor(object): if committed_bundle.pcollection in self.value_to_consumers: consumers = self.value_to_consumers[committed_bundle.pcollection] for applied_ptransform in consumers: - self.schedule_consumption(applied_ptransform, committed_bundle, + self.schedule_consumption(applied_ptransform, committed_bundle, [], self.default_completion_callback) def schedule_consumption(self, consumer_applied_ptransform, committed_bundle, - on_complete): + fired_timers, on_complete): """Schedules evaluation of the given bundle with the transform.""" assert consumer_applied_ptransform assert committed_bundle @@ -397,8 +402,8 @@ class _ExecutorServiceParallelExecutor(object): transform_executor = TransformExecutor( self.transform_evaluator_registry, self.evaluation_context, - committed_bundle, consumer_applied_ptransform, on_complete, - transform_executor_service) + committed_bundle, fired_timers, consumer_applied_ptransform, + on_complete, transform_executor_service) transform_executor_service.schedule(transform_executor) class _TypedUpdateQueue(object): @@ -527,19 +532,21 @@ class _ExecutorServiceParallelExecutor(object): Returns: True if timers fired. """ - fired_timers = self._executor.evaluation_context.extract_fired_timers() - for applied_ptransform in fired_timers: + transform_fired_timers = ( + self._executor.evaluation_context.extract_fired_timers()) + for applied_ptransform, fired_timers in transform_fired_timers: # Use an empty committed bundle. just to trigger. empty_bundle = ( self._executor.evaluation_context.create_empty_committed_bundle( applied_ptransform.inputs[0])) timer_completion_callback = _CompletionCallback( self._executor.evaluation_context, self._executor.all_updates, - applied_ptransform) + timer_firings=fired_timers) self._executor.schedule_consumption( - applied_ptransform, empty_bundle, timer_completion_callback) - return bool(fired_timers) + applied_ptransform, empty_bundle, fired_timers, + timer_completion_callback) + return bool(transform_fired_timers) def _is_executing(self): """Returns True if there is at least one non-blocked TransformExecutor.""" @@ -582,6 +589,6 @@ class _ExecutorServiceParallelExecutor(object): applied_ptransform, []) for bundle in pending_bundles: self._executor.schedule_consumption( - applied_ptransform, bundle, + applied_ptransform, bundle, [], self._executor.default_completion_callback) self._executor.node_to_pending_bundles[applied_ptransform] = [] http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index 6e73561..e92d799 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -28,13 +28,15 @@ import apache_beam.io as io from apache_beam.runners.common import DoFnRunner from apache_beam.runners.common import DoFnState from apache_beam.runners.direct.watermark_manager import WatermarkManager -from apache_beam.runners.direct.transform_result import TransformResult +from apache_beam.runners.direct.util import KeyedWorkItem +from apache_beam.runners.direct.util import TransformResult from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite # pylint: disable=protected-access from apache_beam.transforms import core from apache_beam.transforms.window import GlobalWindows from apache_beam.transforms.window import WindowedValue from apache_beam.transforms.trigger import _CombiningValueStateTag from apache_beam.transforms.trigger import _ListStateTag +from apache_beam.transforms.trigger import TimeDomain from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn from apache_beam.typehints.typecheck import TypeCheckError from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn @@ -199,6 +201,25 @@ class _TransformEvaluator(object): """Starts a new bundle.""" pass + def process_timer_wrapper(self, timer_firing): + """Process timer by clearing and then calling process_timer(). + + This method is called with any timer firing and clears the delivered + timer from the keyed state and then calls process_timer(). The default + process_timer() implementation emits a KeyedWorkItem for the particular + timer and passes it to process_element(). Evaluator subclasses which + desire different timer delivery semantics can override process_timer(). + """ + state = self.step_context.get_keyed_state(timer_firing.key) + state.clear_timer( + timer_firing.window, timer_firing.name, timer_firing.time_domain) + self.process_timer(timer_firing) + + def process_timer(self, timer_firing): + """Default process_timer() impl. generating KeyedWorkItem element.""" + self.process_element( + KeyedWorkItem(timer_firing.key, timer_firing=timer_firing)) + def process_element(self, element): """Processes a new element as part of the current bundle.""" raise NotImplementedError('%s do not process elements.', type(self)) @@ -244,7 +265,7 @@ class _BoundedReadEvaluator(_TransformEvaluator): bundles = _read_values_to_bundles(reader) return TransformResult( - self._applied_ptransform, bundles, None, None, None) + self._applied_ptransform, bundles, None, None) class _FlattenEvaluator(_TransformEvaluator): @@ -268,7 +289,7 @@ class _FlattenEvaluator(_TransformEvaluator): def finish_bundle(self): bundles = [self.bundle] return TransformResult( - self._applied_ptransform, bundles, None, None, None) + self._applied_ptransform, bundles, None, None) class _TaggedReceivers(dict): @@ -357,7 +378,7 @@ class _ParDoEvaluator(_TransformEvaluator): bundles = self._tagged_receivers.values() result_counters = self._counter_factory.get_counters() return TransformResult( - self._applied_ptransform, bundles, None, result_counters, None, + self._applied_ptransform, bundles, result_counters, None, self._tagged_receivers.undeclared_in_memory_tag_values) @@ -375,7 +396,6 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator): evaluation_context, applied_ptransform, input_committed_bundle, side_inputs, scoped_metrics_container) - @property def _is_final_bundle(self): return (self._execution_context.watermarks.input_watermark == WatermarkManager.WATERMARK_POS_INF) @@ -392,6 +412,10 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator): self._applied_ptransform.transform.get_type_hints().input_types[0]) self.key_coder = coders.registry.get_coder(kv_type_hint[0].tuple_types[0]) + def process_timer(self, timer_firing): + # We do not need to emit a KeyedWorkItem to process_element(). + pass + def process_element(self, element): assert not self.global_state.get_state( None, _GroupByKeyOnlyEvaluator.COMPLETION_TAG) @@ -408,7 +432,7 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator): % element) def finish_bundle(self): - if self._is_final_bundle: + if self._is_final_bundle(): if self.global_state.get_state( None, _GroupByKeyOnlyEvaluator.COMPLETION_TAG): # Ignore empty bundles after emitting output. (This may happen because @@ -441,9 +465,11 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator): else: bundles = [] hold = WatermarkManager.WATERMARK_NEG_INF + self.global_state.set_timer( + None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF) return TransformResult( - self._applied_ptransform, bundles, None, None, hold) + self._applied_ptransform, bundles, None, hold) class _NativeWriteEvaluator(_TransformEvaluator): @@ -475,6 +501,10 @@ class _NativeWriteEvaluator(_TransformEvaluator): self.step_context = self._execution_context.get_step_context() self.global_state = self.step_context.get_keyed_state(None) + def process_timer(self, timer_firing): + # We do not need to emit a KeyedWorkItem to process_element(). + pass + def process_element(self, element): self.global_state.add_state( None, _NativeWriteEvaluator.ELEMENTS_TAG, element) @@ -500,6 +530,8 @@ class _NativeWriteEvaluator(_TransformEvaluator): hold = WatermarkManager.WATERMARK_POS_INF else: hold = WatermarkManager.WATERMARK_NEG_INF + self.global_state.set_timer( + None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF) return TransformResult( - self._applied_ptransform, [], None, None, hold) + self._applied_ptransform, [], None, hold) http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/runners/direct/transform_result.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_result.py b/sdks/python/apache_beam/runners/direct/transform_result.py deleted file mode 100644 index 51593e3..0000000 --- a/sdks/python/apache_beam/runners/direct/transform_result.py +++ /dev/null @@ -1,40 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""The result of evaluating an AppliedPTransform with a TransformEvaluator.""" - -from __future__ import absolute_import - - -class TransformResult(object): - """For internal use only; no backwards-compatibility guarantees. - - The result of evaluating an AppliedPTransform with a TransformEvaluator.""" - - def __init__(self, applied_ptransform, uncommitted_output_bundles, - timer_update, counters, watermark_hold, - undeclared_tag_values=None): - self.transform = applied_ptransform - self.uncommitted_output_bundles = uncommitted_output_bundles - # TODO: timer update is currently unused. - self.timer_update = timer_update - self.counters = counters - self.watermark_hold = watermark_hold - # Only used when caching (materializing) all values is requested. - self.undeclared_tag_values = undeclared_tag_values - # Populated by the TransformExecutor. - self.logical_metric_updates = None http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/runners/direct/util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/util.py b/sdks/python/apache_beam/runners/direct/util.py new file mode 100644 index 0000000..daaaceb --- /dev/null +++ b/sdks/python/apache_beam/runners/direct/util.py @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utility classes used by the DirectRunner. + +For internal use only. No backwards compatibility guarantees. +""" + +from __future__ import absolute_import + + +class TransformResult(object): + """Result of evaluating an AppliedPTransform with a TransformEvaluator.""" + + def __init__(self, applied_ptransform, uncommitted_output_bundles, + counters, watermark_hold, undeclared_tag_values=None): + self.transform = applied_ptransform + self.uncommitted_output_bundles = uncommitted_output_bundles + self.counters = counters + self.watermark_hold = watermark_hold + # Only used when caching (materializing) all values is requested. + self.undeclared_tag_values = undeclared_tag_values + # Populated by the TransformExecutor. + self.logical_metric_updates = None + + +class TimerFiring(object): + """A single instance of a fired timer.""" + + def __init__(self, key, window, name, time_domain, timestamp): + self.key = key + self.window = window + self.name = name + self.time_domain = time_domain + self.timestamp = timestamp + + +class KeyedWorkItem(object): + """A keyed item that can either be a timer firing or a list of elements.""" + def __init__(self, key, timer_firing=None, elements=None): + self.key = key + assert not timer_firing and elements + self.timer_firing = timer_firing + self.elements = elements http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/runners/direct/watermark_manager.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py index 0d7cd4f..10d25d7 100644 --- a/sdks/python/apache_beam/runners/direct/watermark_manager.py +++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py @@ -23,6 +23,7 @@ import threading from apache_beam import pipeline from apache_beam import pvalue +from apache_beam.runners.direct.util import TimerFiring from apache_beam.utils.timestamp import MAX_TIMESTAMP from apache_beam.utils.timestamp import MIN_TIMESTAMP from apache_beam.utils.timestamp import TIME_GRANULARITY @@ -36,21 +37,23 @@ class WatermarkManager(object): WATERMARK_POS_INF = MAX_TIMESTAMP WATERMARK_NEG_INF = MIN_TIMESTAMP - def __init__(self, clock, root_transforms, value_to_consumers): + def __init__(self, clock, root_transforms, value_to_consumers, + transform_keyed_states): self._clock = clock # processing time clock - self._value_to_consumers = value_to_consumers self._root_transforms = root_transforms + self._value_to_consumers = value_to_consumers + self._transform_keyed_states = transform_keyed_states # AppliedPTransform -> TransformWatermarks self._transform_to_watermarks = {} for root_transform in root_transforms: self._transform_to_watermarks[root_transform] = _TransformWatermarks( - self._clock) + self._clock, transform_keyed_states[root_transform], root_transform) for consumers in value_to_consumers.values(): for consumer in consumers: self._transform_to_watermarks[consumer] = _TransformWatermarks( - self._clock) + self._clock, transform_keyed_states[consumer], consumer) for consumers in value_to_consumers.values(): for consumer in consumers: @@ -90,16 +93,17 @@ class WatermarkManager(object): return self._transform_to_watermarks[applied_ptransform] def update_watermarks(self, completed_committed_bundle, applied_ptransform, - timer_update, outputs, earliest_hold): + completed_timers, outputs, earliest_hold): assert isinstance(applied_ptransform, pipeline.AppliedPTransform) self._update_pending( - completed_committed_bundle, applied_ptransform, timer_update, outputs) + completed_committed_bundle, applied_ptransform, completed_timers, + outputs) tw = self.get_watermarks(applied_ptransform) tw.hold(earliest_hold) self._refresh_watermarks(applied_ptransform) def _update_pending(self, input_committed_bundle, applied_ptransform, - timer_update, output_committed_bundles): + completed_timers, output_committed_bundles): """Updated list of pending bundles for the given AppliedPTransform.""" # Update pending elements. Filter out empty bundles. They do not impact @@ -113,7 +117,7 @@ class WatermarkManager(object): consumer_tw.add_pending(output) completed_tw = self._transform_to_watermarks[applied_ptransform] - completed_tw.update_timers(timer_update) + completed_tw.update_timers(completed_timers) assert input_committed_bundle or applied_ptransform in self._root_transforms if input_committed_bundle and input_committed_bundle.has_elements(): @@ -137,33 +141,37 @@ class WatermarkManager(object): def extract_fired_timers(self): all_timers = [] for applied_ptransform, tw in self._transform_to_watermarks.iteritems(): - if tw.extract_fired_timers(): - all_timers.append(applied_ptransform) + fired_timers = tw.extract_fired_timers() + if fired_timers: + all_timers.append((applied_ptransform, fired_timers)) return all_timers class _TransformWatermarks(object): - """Tracks input and output watermarks for aan AppliedPTransform.""" + """Tracks input and output watermarks for an AppliedPTransform.""" - def __init__(self, clock): + def __init__(self, clock, keyed_states, transform): self._clock = clock + self._keyed_states = keyed_states self._input_transform_watermarks = [] self._input_watermark = WatermarkManager.WATERMARK_NEG_INF self._output_watermark = WatermarkManager.WATERMARK_NEG_INF self._earliest_hold = WatermarkManager.WATERMARK_POS_INF self._pending = set() # Scheduled bundles targeted for this transform. - self._fired_timers = False + self._fired_timers = set() self._lock = threading.Lock() + self._label = str(transform) + def update_input_transform_watermarks(self, input_transform_watermarks): with self._lock: self._input_transform_watermarks = input_transform_watermarks - def update_timers(self, timer_update): + def update_timers(self, completed_timers): with self._lock: - if timer_update: - assert self._fired_timers - self._fired_timers = False + for timer_firing in completed_timers: + print 'REMOVE', timer_firing + self._fired_timers.remove(timer_firing) @property def input_watermark(self): @@ -233,8 +241,12 @@ class _TransformWatermarks(object): if self._fired_timers: return False - should_fire = ( - self._earliest_hold < WatermarkManager.WATERMARK_POS_INF and - self._input_watermark == WatermarkManager.WATERMARK_POS_INF) - self._fired_timers = should_fire - return should_fire + fired_timers = [] + for key, state in self._keyed_states.iteritems(): + timers = state.get_timers(watermark=self._input_watermark) + for expired in timers: + window, (name, time_domain, timestamp) = expired + fired_timers.append( + TimerFiring(key, window, name, time_domain, timestamp)) + self._fired_timers.update(fired_timers) + return fired_timers http://git-wip-us.apache.org/repos/asf/beam/blob/56041b78/sdks/python/apache_beam/transforms/trigger.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index 89c6ec5..7ff44fa 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -1102,17 +1102,21 @@ class InMemoryUnmergedState(UnmergedState): if not self.state[window]: self.state.pop(window, None) - def get_and_clear_timers(self, watermark=MAX_TIMESTAMP): + def get_timers(self, clear=False, watermark=MAX_TIMESTAMP): expired = [] for window, timers in list(self.timers.items()): for (name, time_domain), timestamp in list(timers.items()): if timestamp <= watermark: expired.append((window, (name, time_domain, timestamp))) - del timers[(name, time_domain)] - if not timers: + if clear: + del timers[(name, time_domain)] + if not timers and clear: del self.timers[window] return expired + def get_and_clear_timers(self, watermark=MAX_TIMESTAMP): + return self.get_timers(clear=True, watermark=watermark) + def __repr__(self): state_str = '\n'.join('%s: %s' % (key, dict(state)) for key, state in self.state.items())
