Repository: beam Updated Branches: refs/heads/master 28c6fd42e -> f0467b72f
Allow production of unprocessed bundles, introduce TestStream evaluator in DirectRunner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3520f948 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3520f948 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3520f948 Branch: refs/heads/master Commit: 3520f94882b00aa8db64f6379044689d1b78ac06 Parents: 28c6fd4 Author: Charles Chen <c...@google.com> Authored: Tue Jun 20 17:16:20 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Wed Jun 21 13:44:05 2017 -0700 ---------------------------------------------------------------------- .../runners/direct/evaluation_context.py | 14 ++-- .../apache_beam/runners/direct/executor.py | 40 +++++++-- .../runners/direct/transform_evaluator.py | 88 ++++++++++++++++++-- sdks/python/apache_beam/runners/direct/util.py | 4 +- .../runners/direct/watermark_manager.py | 11 ++- sdks/python/apache_beam/testing/test_stream.py | 5 ++ .../apache_beam/testing/test_stream_test.py | 37 ++++++++ 7 files changed, 176 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/runners/direct/evaluation_context.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py index 976e9e8..669a68a 100644 --- a/sdks/python/apache_beam/runners/direct/evaluation_context.py +++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py @@ -208,11 +208,12 @@ class EvaluationContext(object): the committed bundles contained within the handled result. """ with self._lock: - committed_bundles = self._commit_bundles( - result.uncommitted_output_bundles) + committed_bundles, unprocessed_bundles = self._commit_bundles( + result.uncommitted_output_bundles, + result.unprocessed_bundles) self._watermark_manager.update_watermarks( completed_bundle, result.transform, completed_timers, - committed_bundles, result.watermark_hold) + committed_bundles, unprocessed_bundles, result.watermark_hold) self._metrics.commit_logical(completed_bundle, result.logical_metric_updates) @@ -252,14 +253,17 @@ class EvaluationContext(object): executor_service.submit(task) self._pending_unblocked_tasks = [] - def _commit_bundles(self, uncommitted_bundles): + def _commit_bundles(self, uncommitted_bundles, unprocessed_bundles): """Commits bundles and returns a immutable set of committed bundles.""" for in_progress_bundle in uncommitted_bundles: producing_applied_ptransform = in_progress_bundle.pcollection.producer watermarks = self._watermark_manager.get_watermarks( producing_applied_ptransform) in_progress_bundle.commit(watermarks.synchronized_processing_output_time) - return tuple(uncommitted_bundles) + + for unprocessed_bundle in unprocessed_bundles: + unprocessed_bundle.commit(None) + return tuple(uncommitted_bundles), tuple(unprocessed_bundles) def get_execution_context(self, applied_ptransform): return _ExecutionContext( http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/runners/direct/executor.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py index a0a3886..e70e326 100644 --- a/sdks/python/apache_beam/runners/direct/executor.py +++ b/sdks/python/apache_beam/runners/direct/executor.py @@ -227,17 +227,25 @@ class _CompletionCallback(object): self._all_updates = all_updates self._timer_firings = timer_firings or [] - def handle_result(self, input_committed_bundle, transform_result): + def handle_result(self, transform_executor, input_committed_bundle, + transform_result): output_committed_bundles = self._evaluation_context.handle_result( input_committed_bundle, self._timer_firings, transform_result) for output_committed_bundle in output_committed_bundles: self._all_updates.offer(_ExecutorServiceParallelExecutor._ExecutorUpdate( - output_committed_bundle, None)) + transform_executor, + committed_bundle=output_committed_bundle)) + for unprocessed_bundle in transform_result.unprocessed_bundles: + self._all_updates.offer( + _ExecutorServiceParallelExecutor._ExecutorUpdate( + transform_executor, + unprocessed_bundle=unprocessed_bundle)) return output_committed_bundles - def handle_exception(self, exception): + def handle_exception(self, transform_executor, exception): self._all_updates.offer( - _ExecutorServiceParallelExecutor._ExecutorUpdate(None, exception)) + _ExecutorServiceParallelExecutor._ExecutorUpdate( + transform_executor, exception=exception)) class TransformExecutor(_ExecutorService.CallableTask): @@ -312,10 +320,10 @@ class TransformExecutor(_ExecutorService.CallableTask): self._evaluation_context.append_to_cache( self._applied_ptransform, tag, value) - self._completion_callback.handle_result(self._input_bundle, result) + self._completion_callback.handle_result(self, self._input_bundle, result) return result except Exception as e: # pylint: disable=broad-except - self._completion_callback.handle_exception(e) + self._completion_callback.handle_exception(self, e) finally: self._evaluation_context.metrics().commit_physical( self._input_bundle, @@ -387,6 +395,10 @@ class _ExecutorServiceParallelExecutor(object): self.schedule_consumption(applied_ptransform, committed_bundle, [], self.default_completion_callback) + def schedule_unprocessed_bundle(self, applied_ptransform, + unprocessed_bundle): + self.node_to_pending_bundles[applied_ptransform].append(unprocessed_bundle) + def schedule_consumption(self, consumer_applied_ptransform, committed_bundle, fired_timers, on_complete): """Schedules evaluation of the given bundle with the transform.""" @@ -433,10 +445,16 @@ class _ExecutorServiceParallelExecutor(object): class _ExecutorUpdate(object): """An internal status update on the state of the executor.""" - def __init__(self, produced_bundle=None, exception=None): + def __init__(self, transform_executor, committed_bundle=None, + unprocessed_bundle=None, exception=None): + self.transform_executor = transform_executor # Exactly one of them should be not-None - assert bool(produced_bundle) != bool(exception) - self.committed_bundle = produced_bundle + assert sum([ + bool(committed_bundle), + bool(unprocessed_bundle), + bool(exception)]) == 1 + self.committed_bundle = committed_bundle + self.unprocessed_bundle = unprocessed_bundle self.exception = exception self.exc_info = sys.exc_info() if self.exc_info[1] is not exception: @@ -471,6 +489,10 @@ class _ExecutorServiceParallelExecutor(object): while update: if update.committed_bundle: self._executor.schedule_consumers(update.committed_bundle) + elif update.unprocessed_bundle: + self._executor.schedule_unprocessed_bundle( + update.transform_executor._applied_ptransform, + update.unprocessed_bundle) else: assert update.exception logging.warning('A task failed with exception.\n %s', http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index e92d799..3aefbb8 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -31,6 +31,10 @@ from apache_beam.runners.direct.watermark_manager import WatermarkManager from apache_beam.runners.direct.util import KeyedWorkItem from apache_beam.runners.direct.util import TransformResult from apache_beam.runners.dataflow.native_io.iobase import _NativeWrite # pylint: disable=protected-access +from apache_beam.testing.test_stream import TestStream +from apache_beam.testing.test_stream import ElementEvent +from apache_beam.testing.test_stream import WatermarkEvent +from apache_beam.testing.test_stream import ProcessingTimeEvent from apache_beam.transforms import core from apache_beam.transforms.window import GlobalWindows from apache_beam.transforms.window import WindowedValue @@ -41,6 +45,7 @@ from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn from apache_beam.typehints.typecheck import TypeCheckError from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn from apache_beam.utils import counters +from apache_beam.utils.timestamp import MIN_TIMESTAMP from apache_beam.options.pipeline_options import TypeOptions @@ -59,9 +64,11 @@ class TransformEvaluatorRegistry(object): core.ParDo: _ParDoEvaluator, core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator, _NativeWrite: _NativeWriteEvaluator, + TestStream: _TestStreamEvaluator, } self._root_bundle_providers = { core.PTransform: DefaultRootBundleProvider, + TestStream: _TestStreamRootBundleProvider, } def get_evaluator( @@ -142,6 +149,23 @@ class DefaultRootBundleProvider(RootBundleProvider): return [empty_bundle] +class _TestStreamRootBundleProvider(RootBundleProvider): + """Provides an initial bundle for the TestStream evaluator.""" + + def get_root_bundles(self): + test_stream = self._applied_ptransform.transform + bundles = [] + if len(test_stream.events) > 0: + bundle = self._evaluation_context.create_bundle( + pvalue.PBegin(self._applied_ptransform.transform.pipeline)) + # Explicitly set timestamp to MIN_TIMESTAMP to ensure that we hold the + # watermark. + bundle.add(GlobalWindows.windowed_value(0, timestamp=MIN_TIMESTAMP)) + bundle.commit(None) + bundles.append(bundle) + return bundles + + class _TransformEvaluator(object): """An evaluator of a specific application of a transform.""" @@ -265,7 +289,61 @@ class _BoundedReadEvaluator(_TransformEvaluator): bundles = _read_values_to_bundles(reader) return TransformResult( - self._applied_ptransform, bundles, None, None) + self._applied_ptransform, bundles, [], None, None) + + +class _TestStreamEvaluator(_TransformEvaluator): + """TransformEvaluator for the TestStream transform.""" + + def __init__(self, evaluation_context, applied_ptransform, + input_committed_bundle, side_inputs, scoped_metrics_container): + assert not side_inputs + self.test_stream = applied_ptransform.transform + super(_TestStreamEvaluator, self).__init__( + evaluation_context, applied_ptransform, input_committed_bundle, + side_inputs, scoped_metrics_container) + + def start_bundle(self): + self.current_index = -1 + self.watermark = MIN_TIMESTAMP + self.bundles = [] + + def process_element(self, element): + index = element.value + self.watermark = element.timestamp + assert isinstance(index, int) + assert 0 <= index <= len(self.test_stream.events) + self.current_index = index + event = self.test_stream.events[self.current_index] + if isinstance(event, ElementEvent): + assert len(self._outputs) == 1 + output_pcollection = list(self._outputs)[0] + bundle = self._evaluation_context.create_bundle(output_pcollection) + for tv in event.timestamped_values: + bundle.output( + GlobalWindows.windowed_value(tv.value, timestamp=tv.timestamp)) + self.bundles.append(bundle) + elif isinstance(event, WatermarkEvent): + assert event.new_watermark >= self.watermark + self.watermark = event.new_watermark + elif isinstance(event, ProcessingTimeEvent): + # TODO(ccy): advance processing time in the context's mock clock. + pass + else: + raise ValueError('Invalid TestStream event: %s.' % event) + + def finish_bundle(self): + unprocessed_bundles = [] + hold = None + if self.current_index < len(self.test_stream.events) - 1: + unprocessed_bundle = self._evaluation_context.create_bundle( + pvalue.PBegin(self._applied_ptransform.transform.pipeline)) + unprocessed_bundle.add(GlobalWindows.windowed_value( + self.current_index + 1, timestamp=self.watermark)) + unprocessed_bundles.append(unprocessed_bundle) + hold = self.watermark + return TransformResult( + self._applied_ptransform, self.bundles, unprocessed_bundles, None, hold) class _FlattenEvaluator(_TransformEvaluator): @@ -289,7 +367,7 @@ class _FlattenEvaluator(_TransformEvaluator): def finish_bundle(self): bundles = [self.bundle] return TransformResult( - self._applied_ptransform, bundles, None, None) + self._applied_ptransform, bundles, [], None, None) class _TaggedReceivers(dict): @@ -378,7 +456,7 @@ class _ParDoEvaluator(_TransformEvaluator): bundles = self._tagged_receivers.values() result_counters = self._counter_factory.get_counters() return TransformResult( - self._applied_ptransform, bundles, result_counters, None, + self._applied_ptransform, bundles, [], result_counters, None, self._tagged_receivers.undeclared_in_memory_tag_values) @@ -469,7 +547,7 @@ class _GroupByKeyOnlyEvaluator(_TransformEvaluator): None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF) return TransformResult( - self._applied_ptransform, bundles, None, hold) + self._applied_ptransform, bundles, [], None, hold) class _NativeWriteEvaluator(_TransformEvaluator): @@ -534,4 +612,4 @@ class _NativeWriteEvaluator(_TransformEvaluator): None, '', TimeDomain.WATERMARK, WatermarkManager.WATERMARK_POS_INF) return TransformResult( - self._applied_ptransform, [], None, hold) + self._applied_ptransform, [], [], None, hold) http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/runners/direct/util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/util.py b/sdks/python/apache_beam/runners/direct/util.py index daaaceb..8c846fc 100644 --- a/sdks/python/apache_beam/runners/direct/util.py +++ b/sdks/python/apache_beam/runners/direct/util.py @@ -27,9 +27,11 @@ class TransformResult(object): """Result of evaluating an AppliedPTransform with a TransformEvaluator.""" def __init__(self, applied_ptransform, uncommitted_output_bundles, - counters, watermark_hold, undeclared_tag_values=None): + unprocessed_bundles, counters, watermark_hold, + undeclared_tag_values=None): self.transform = applied_ptransform self.uncommitted_output_bundles = uncommitted_output_bundles + self.unprocessed_bundles = unprocessed_bundles self.counters = counters self.watermark_hold = watermark_hold # Only used when caching (materializing) all values is requested. http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/runners/direct/watermark_manager.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py index 10d25d7..2146bb5 100644 --- a/sdks/python/apache_beam/runners/direct/watermark_manager.py +++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py @@ -93,17 +93,19 @@ class WatermarkManager(object): return self._transform_to_watermarks[applied_ptransform] def update_watermarks(self, completed_committed_bundle, applied_ptransform, - completed_timers, outputs, earliest_hold): + completed_timers, outputs, unprocessed_bundles, + earliest_hold): assert isinstance(applied_ptransform, pipeline.AppliedPTransform) self._update_pending( completed_committed_bundle, applied_ptransform, completed_timers, - outputs) + outputs, unprocessed_bundles) tw = self.get_watermarks(applied_ptransform) tw.hold(earliest_hold) self._refresh_watermarks(applied_ptransform) def _update_pending(self, input_committed_bundle, applied_ptransform, - completed_timers, output_committed_bundles): + completed_timers, output_committed_bundles, + unprocessed_bundles): """Updated list of pending bundles for the given AppliedPTransform.""" # Update pending elements. Filter out empty bundles. They do not impact @@ -119,6 +121,9 @@ class WatermarkManager(object): completed_tw = self._transform_to_watermarks[applied_ptransform] completed_tw.update_timers(completed_timers) + for unprocessed_bundle in unprocessed_bundles: + completed_tw.add_pending(unprocessed_bundle) + assert input_committed_bundle or applied_ptransform in self._root_transforms if input_committed_bundle and input_committed_bundle.has_elements(): completed_tw.remove_pending(input_committed_bundle) http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/testing/test_stream.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py index a06bcd0..7989fb2 100644 --- a/sdks/python/apache_beam/testing/test_stream.py +++ b/sdks/python/apache_beam/testing/test_stream.py @@ -24,8 +24,10 @@ from abc import ABCMeta from abc import abstractmethod from apache_beam import coders +from apache_beam import core from apache_beam import pvalue from apache_beam.transforms import PTransform +from apache_beam.transforms import window from apache_beam.transforms.window import TimestampedValue from apache_beam.utils import timestamp from apache_beam.utils.windowed_value import WindowedValue @@ -99,6 +101,9 @@ class TestStream(PTransform): self.current_watermark = timestamp.MIN_TIMESTAMP self.events = [] + def get_windowing(self, unused_inputs): + return core.Windowing(window.GlobalWindows()) + def expand(self, pbegin): assert isinstance(pbegin, pvalue.PBegin) self.pipeline = pbegin.pipeline http://git-wip-us.apache.org/repos/asf/beam/blob/3520f948/sdks/python/apache_beam/testing/test_stream_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/test_stream_test.py b/sdks/python/apache_beam/testing/test_stream_test.py index e32dda2..bf05ac1 100644 --- a/sdks/python/apache_beam/testing/test_stream_test.py +++ b/sdks/python/apache_beam/testing/test_stream_test.py @@ -19,6 +19,8 @@ import unittest +import apache_beam as beam +from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.test_stream import ElementEvent from apache_beam.testing.test_stream import ProcessingTimeEvent from apache_beam.testing.test_stream import TestStream @@ -78,6 +80,41 @@ class TestStreamTest(unittest.TestCase): TimestampedValue('a', timestamp.MAX_TIMESTAMP) ])) + def test_basic_execution(self): + test_stream = (TestStream() + .advance_watermark_to(10) + .add_elements(['a', 'b', 'c']) + .advance_watermark_to(20) + .add_elements(['d']) + .add_elements(['e']) + .advance_processing_time(10) + .advance_watermark_to(300) + .add_elements([TimestampedValue('late', 12)]) + .add_elements([TimestampedValue('last', 310)])) + + global _seen_elements # pylint: disable=global-variable-undefined + _seen_elements = [] + + class RecordFn(beam.DoFn): + def process(self, element=beam.DoFn.ElementParam, + timestamp=beam.DoFn.TimestampParam): + _seen_elements.append((element, timestamp)) + + p = TestPipeline() + my_record_fn = RecordFn() + p | test_stream | beam.ParDo(my_record_fn) # pylint: disable=expression-not-assigned + p.run() + + self.assertEqual([ + ('a', timestamp.Timestamp(10)), + ('b', timestamp.Timestamp(10)), + ('c', timestamp.Timestamp(10)), + ('d', timestamp.Timestamp(20)), + ('e', timestamp.Timestamp(20)), + ('late', timestamp.Timestamp(12)), + ('last', timestamp.Timestamp(310)),], _seen_elements) + del _seen_elements + if __name__ == '__main__': unittest.main()