[ https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81917&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81917 ]
ASF GitHub Bot logged work on BEAM-3818: ---------------------------------------- Author: ASF GitHub Bot Created on: 19/Mar/18 16:51 Start Date: 19/Mar/18 16:51 Worklog Time Spent: 10m Work Description: aaltay closed pull request #4838: [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part I: update _SideInputsContainer as the watermark advances) URL: https://github.com/apache/beam/pull/4838 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py b/sdks/python/apache_beam/runners/direct/evaluation_context.py index 46176c9e969..d0ab55f5462 100644 --- a/sdks/python/apache_beam/runners/direct/evaluation_context.py +++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py @@ -56,6 +56,11 @@ def __init__(self, view): self.value = None self.has_result = False + def __repr__(self): + elements_string = (', '.join(str(elm) for elm in self.elements) + if self.elements else '[]') + return '_SideInputView(elements=%s)' % elements_string + class _SideInputsContainer(object): """An in-process container for side inputs. @@ -67,8 +72,16 @@ class _SideInputsContainer(object): def __init__(self, views): self._lock = threading.Lock() self._views = {} + self._transform_to_views = collections.defaultdict(list) + for view in views: self._views[view] = _SideInputView(view) + self._transform_to_views[view.pvalue.producer].append(view) + + def __repr__(self): + views_string = (', '.join(str(elm) for elm in self._views.values()) + if self._views.values() else '[]') + return '_SideInputsContainer(_views=%s)' % views_string def get_value_or_schedule_after_output(self, side_input, task): with self._lock: @@ -99,6 +112,19 @@ def finalize_value_and_get_tasks(self, side_input): view.has_result = True return result + def update_watermarks_for_transform(self, ptransform, watermark): + # Collect tasks that get unblocked as the workflow progresses. + unblocked_tasks = [] + for view in self._transform_to_views[ptransform]: + unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark)) + return unblocked_tasks + + def _update_watermarks_for_view(self, view, watermark): + unblocked_tasks = [] + if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF: + unblocked_tasks = self.finalize_value_and_get_tasks(view) + return unblocked_tasks + def _pvalue_to_value(self, view, values): """Given a side input view, returns the associated value in requested form. @@ -149,10 +175,10 @@ def __init__(self, pipeline_options, bundle_factory, root_transforms, self._pcollection_to_views[view.pvalue].append(view) self._transform_keyed_states = self._initialize_keyed_states( root_transforms, value_to_consumers) + self._side_inputs_container = _SideInputsContainer(views) self._watermark_manager = WatermarkManager( clock, root_transforms, value_to_consumers, self._transform_keyed_states) - self._side_inputs_container = _SideInputsContainer(views) self._pending_unblocked_tasks = [] self._counter_factory = counters.CounterFactory() self._metrics = DirectMetrics() @@ -199,9 +225,6 @@ def handle_result( committed_bundles, unprocessed_bundles = self._commit_bundles( result.uncommitted_output_bundles, result.unprocessed_bundles) - self._watermark_manager.update_watermarks( - completed_bundle, result.transform, completed_timers, - committed_bundles, unprocessed_bundles, result.keyed_watermark_holds) self._metrics.commit_logical(completed_bundle, result.logical_metric_updates) @@ -217,11 +240,13 @@ def handle_result( self._side_inputs_container.add_values( view, committed_bundle.get_elements_iterable(make_copy=True)) - if (self.get_execution_context(result.transform) - .watermarks.input_watermark - == WatermarkManager.WATERMARK_POS_INF): - self._pending_unblocked_tasks.extend( - self._side_inputs_container.finalize_value_and_get_tasks(view)) + + # Tasks generated from unblocked side inputs as the watermark progresses. + tasks = self._watermark_manager.update_watermarks( + completed_bundle, result.transform, completed_timers, + committed_bundles, unprocessed_bundles, result.keyed_watermark_holds, + self._side_inputs_container) + self._pending_unblocked_tasks.extend(tasks) if result.counters: for counter in result.counters: diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py index 084073f4fe7..74a021674f8 100644 --- a/sdks/python/apache_beam/runners/direct/watermark_manager.py +++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py @@ -94,14 +94,14 @@ def get_watermarks(self, applied_ptransform): def update_watermarks(self, completed_committed_bundle, applied_ptransform, completed_timers, outputs, unprocessed_bundles, - keyed_earliest_holds): + keyed_earliest_holds, side_inputs_container): assert isinstance(applied_ptransform, pipeline.AppliedPTransform) self._update_pending( completed_committed_bundle, applied_ptransform, completed_timers, outputs, unprocessed_bundles) tw = self.get_watermarks(applied_ptransform) tw.hold(keyed_earliest_holds) - self._refresh_watermarks(applied_ptransform) + return self._refresh_watermarks(applied_ptransform, side_inputs_container) def _update_pending(self, input_committed_bundle, applied_ptransform, completed_timers, output_committed_bundles, @@ -128,8 +128,9 @@ def _update_pending(self, input_committed_bundle, applied_ptransform, if input_committed_bundle and input_committed_bundle.has_elements(): completed_tw.remove_pending(input_committed_bundle) - def _refresh_watermarks(self, applied_ptransform): + def _refresh_watermarks(self, applied_ptransform, side_inputs_container): assert isinstance(applied_ptransform, pipeline.AppliedPTransform) + unblocked_tasks = [] tw = self.get_watermarks(applied_ptransform) if tw.refresh(): for pval in applied_ptransform.outputs.values(): @@ -141,7 +142,12 @@ def _refresh_watermarks(self, applied_ptransform): if v in self._value_to_consumers: # If there are downstream consumers consumers = self._value_to_consumers[v] for consumer in consumers: - self._refresh_watermarks(consumer) + unblocked_tasks.extend( + self._refresh_watermarks(consumer, side_inputs_container)) + unblocked_tasks.extend( + side_inputs_container.update_watermarks_for_transform( + applied_ptransform, tw)) + return unblocked_tasks def extract_all_timers(self): """Extracts fired timers for all transforms diff --git a/sdks/python/apache_beam/testing/test_stream_test.py b/sdks/python/apache_beam/testing/test_stream_test.py index a3f2413f167..3beeb4b2825 100644 --- a/sdks/python/apache_beam/testing/test_stream_test.py +++ b/sdks/python/apache_beam/testing/test_stream_test.py @@ -30,9 +30,11 @@ from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to from apache_beam.transforms import trigger +from apache_beam.transforms import window from apache_beam.transforms.window import FixedWindows from apache_beam.transforms.window import TimestampedValue from apache_beam.utils import timestamp +from apache_beam.utils.timestamp import Timestamp from apache_beam.utils.windowed_value import WindowedValue @@ -245,6 +247,84 @@ def fired_elements(elem): # TODO(BEAM-3377): Remove after assert_that in streaming is fixed. self.assertEqual([('k', ['a'])], result) + def test_basic_execution_sideinputs_batch(self): + + # TODO(BEAM-3377): Remove after assert_that in streaming is fixed. + global result # pylint: disable=global-variable-undefined + result = [] + + def recorded_elements(elem): + result.append(elem) + return elem + + options = PipelineOptions() + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) + + main_stream = (p + | 'main TestStream' >> TestStream() + .advance_watermark_to(10) + .add_elements(['e'])) + side = (p + | beam.Create([2, 1, 4]) + | beam.Map(lambda t: window.TimestampedValue(t, t))) + + class RecordFn(beam.DoFn): + def process(self, + elm=beam.DoFn.ElementParam, + ts=beam.DoFn.TimestampParam, + side=beam.DoFn.SideInputParam): + yield (elm, ts, side) + + records = (main_stream # pylint: disable=unused-variable + | beam.ParDo(RecordFn(), beam.pvalue.AsList(side)) + | beam.Map(recorded_elements)) + p.run() + + # TODO(BEAM-3377): Remove after assert_that in streaming is fixed. + self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result) + + def test_basic_execution_sideinputs(self): + + # TODO(BEAM-3377): Remove after assert_that in streaming is fixed. + global result # pylint: disable=global-variable-undefined + result = [] + + def recorded_elements(elem): + result.append(elem) + return elem + + options = PipelineOptions() + options.view_as(StandardOptions).streaming = True + p = TestPipeline(options=options) + + main_stream = (p + | 'main TestStream' >> TestStream() + .advance_watermark_to(10) + .add_elements(['e']) + .advance_processing_time(11)) + side_stream = (p + | 'side TestStream' >> TestStream() + .add_elements([window.TimestampedValue(2, 2)]) + .add_elements([window.TimestampedValue(1, 1)]) + .add_elements([window.TimestampedValue(4, 4)])) + + class RecordFn(beam.DoFn): + def process(self, + elm=beam.DoFn.ElementParam, + ts=beam.DoFn.TimestampParam, + side=beam.DoFn.SideInputParam): + yield (elm, ts, side) + + records = (main_stream # pylint: disable=unused-variable + | beam.ParDo(RecordFn(), beam.pvalue.AsList(side_stream)) + | beam.Map(recorded_elements)) + + p.run() + + # TODO(BEAM-3377): Remove after assert_that in streaming is fixed. + self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result) + if __name__ == '__main__': unittest.main() ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 81917) Time Spent: 4h 40m (was: 4.5h) > Add support for the streaming side inputs in the Python DirectRunner > -------------------------------------------------------------------- > > Key: BEAM-3818 > URL: https://issues.apache.org/jira/browse/BEAM-3818 > Project: Beam > Issue Type: New Feature > Components: sdk-py-core > Reporter: María GH > Assignee: María GH > Priority: Minor > Fix For: 3.0.0 > > Time Spent: 4h 40m > Remaining Estimate: 0h > > The streaming DirectRunner should support streaming side input semantics. > Currently, side inputs are only available for globally-windowed side input > PCollections. > Also, empty side inputs cause a pipeline stall. -- This message was sent by Atlassian JIRA (v7.6.3#76005)