This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fdbd7fa [BEAM-3818] Add support for streaming side inputs in the
DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated)
(#4949)
fdbd7fa is described below
commit fdbd7fafb3a8c26fde340494642f500fc3893f60
Author: mariapython <[email protected]>
AuthorDate: Mon Apr 2 12:08:07 2018 -0700
[BEAM-3818] Add support for streaming side inputs in the DirectRunner (part
2: unblock tasks as the _SideInputsContainer gets updated) (#4949)
Add support for streaming side inputs in the DirectRunner (part 2: unblock
tasks as the _SideInputsContainer gets updated)
---
.../runners/direct/evaluation_context.py | 129 ++++++++++++++-------
sdks/python/apache_beam/runners/direct/executor.py | 21 +++-
.../runners/direct/watermark_manager.py | 4 +-
.../python/apache_beam/testing/test_stream_test.py | 107 ++++++++++++++++-
4 files changed, 208 insertions(+), 53 deletions(-)
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py
b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index d0ab55f..893e32e 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -51,10 +51,10 @@ class _SideInputView(object):
def __init__(self, view):
self._view = view
- self.callable_queue = collections.deque()
+ self.blocked_tasks = collections.deque()
self.elements = []
self.value = None
- self.has_result = False
+ self.watermark = None
def __repr__(self):
elements_string = (', '.join(str(elm) for elm in self.elements)
@@ -69,67 +69,108 @@ class _SideInputsContainer(object):
to a side input.
"""
- def __init__(self, views):
+ def __init__(self, side_inputs):
self._lock = threading.Lock()
self._views = {}
- self._transform_to_views = collections.defaultdict(list)
+ self._transform_to_side_inputs = collections.defaultdict(list)
+ self._side_input_to_blocked_tasks = collections.defaultdict(list)
- for view in views:
- self._views[view] = _SideInputView(view)
- self._transform_to_views[view.pvalue.producer].append(view)
+ for side in side_inputs:
+ self._views[side] = _SideInputView(side)
+ self._transform_to_side_inputs[side.pvalue.producer].append(side)
def __repr__(self):
views_string = (', '.join(str(elm) for elm in self._views.values())
if self._views.values() else '[]')
return '_SideInputsContainer(_views=%s)' % views_string
- def get_value_or_schedule_after_output(self, side_input, task):
+ def get_value_or_block_until_ready(self, side_input, task, block_until):
+ """Returns the value of a view whose task is unblocked or blocks its task.
+
+ It gets the value of a view whose watermark has been updated and
+ surpasses a given value.
+
+ Args:
+ side_input: ``_UnpickledSideInput`` value.
+ task: ``TransformExecutor`` task waiting on a side input.
+ block_until: Timestamp after which the task gets unblocked.
+
+ Returns:
+ The ``SideInputMap`` value of a view when the tasks it blocks are
+ unblocked. Otherwise, None.
+ """
with self._lock:
view = self._views[side_input]
- if not view.has_result:
- view.callable_queue.append(task)
+ if view.watermark and view.watermark.output_watermark >= block_until:
+ view.value = self._pvalue_to_value(side_input, view.elements)
+ return view.value
+ else:
+ view.blocked_tasks.append((task, block_until))
task.blocked = True
- return (view.has_result, view.value)
def add_values(self, side_input, values):
with self._lock:
view = self._views[side_input]
- assert not view.has_result
view.elements.extend(values)
- def finalize_value_and_get_tasks(self, side_input):
- with self._lock:
- view = self._views[side_input]
- assert not view.has_result
- assert view.value is None
- assert view.callable_queue is not None
- view.value = self._pvalue_to_value(side_input, view.elements)
- view.elements = None
- result = tuple(view.callable_queue)
- for task in result:
- task.blocked = False
- view.callable_queue = None
- view.has_result = True
- return result
-
- def update_watermarks_for_transform(self, ptransform, watermark):
- # Collect tasks that get unblocked as the workflow progresses.
- unblocked_tasks = []
- for view in self._transform_to_views[ptransform]:
- unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
- return unblocked_tasks
+ def update_watermarks_for_transform_and_unblock_tasks(self,
+ ptransform,
+ watermark):
+ """Updates _SideInputsContainer after a watermark update and unbloks tasks.
+
+ It traverses the list of side inputs per PTransform and calls
+ _update_watermarks_for_side_input_and_unblock_tasks to unblock tasks.
- def _update_watermarks_for_view(self, view, watermark):
+ Args:
+ ptransform: Value of a PTransform.
+ watermark: Value of the watermark after an update for a PTransform.
+
+ Returns:
+ Tasks that get unblocked as a result of the watermark advancing.
+ """
unblocked_tasks = []
- if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
- unblocked_tasks = self.finalize_value_and_get_tasks(view)
+ for side in self._transform_to_side_inputs[ptransform]:
+ unblocked_tasks.extend(
+ self._update_watermarks_for_side_input_and_unblock_tasks(
+ side, watermark))
return unblocked_tasks
- def _pvalue_to_value(self, view, values):
- """Given a side input view, returns the associated value in requested form.
+ def _update_watermarks_for_side_input_and_unblock_tasks(self,
+ side_input,
+ watermark):
+ """Helps update _SideInputsContainer after a watermark update.
+
+ For each view of the side input, it updates the value of the watermark
+ recorded when the watermark moved and unblocks tasks accordingly.
+
+ Args:
+ side_input: ``_UnpickledSideInput`` value.
+ watermark: Value of the watermark after an update for a PTransform.
+
+ Returns:
+ Tasks that get unblocked as a result of the watermark advancing.
+ """
+ with self._lock:
+ view = self._views[side_input]
+ view.watermark = watermark
+
+ unblocked_tasks = []
+ tasks_just_unblocked = []
+ for task, block_until in view.blocked_tasks:
+ if watermark.output_watermark >= block_until:
+ view.value = self._pvalue_to_value(side_input, view.elements)
+ unblocked_tasks.append(task)
+ tasks_just_unblocked.append((task, block_until))
+ task.blocked = False
+ for task in tasks_just_unblocked:
+ view.blocked_tasks.remove(task)
+ return unblocked_tasks
+
+ def _pvalue_to_value(self, side_input, values):
+ """Given a side input, returns the associated value in its requested form.
Args:
- view: SideInput for the requested side input.
+ side_input: _UnpickledSideInput object.
values: Iterable values associated with the side input.
Returns:
@@ -138,7 +179,9 @@ class _SideInputsContainer(object):
Raises:
ValueError: If values cannot be converted into the requested form.
"""
- return sideinputs.SideInputMap(type(view), view._view_options(), values)
+ return sideinputs.SideInputMap(type(side_input),
+ side_input._view_options(),
+ values)
class EvaluationContext(object):
@@ -321,10 +364,10 @@ class EvaluationContext(object):
tw = self._watermark_manager.get_watermarks(transform)
return tw.output_watermark == WatermarkManager.WATERMARK_POS_INF
- def get_value_or_schedule_after_output(self, side_input, task):
+ def get_value_or_block_until_ready(self, side_input, task, block_until):
assert isinstance(task, TransformExecutor)
- return self._side_inputs_container.get_value_or_schedule_after_output(
- side_input, task)
+ return self._side_inputs_container.get_value_or_block_until_ready(
+ side_input, task, block_until)
class DirectUnmergedState(InMemoryUnmergedState):
diff --git a/sdks/python/apache_beam/runners/direct/executor.py
b/sdks/python/apache_beam/runners/direct/executor.py
index 5c9fe84..107f007 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -32,6 +32,7 @@ import six
from apache_beam.metrics.execution import MetricsContainer
from apache_beam.metrics.execution import ScopedMetricsContainer
+from apache_beam.transforms import sideinputs
class _ExecutorService(object):
@@ -271,6 +272,13 @@ class TransformExecutor(_ExecutorService.CallableTask):
self._transform_evaluator_registry = transform_evaluator_registry
self._evaluation_context = evaluation_context
self._input_bundle = input_bundle
+ # For non-empty bundles, store the window of the max EOW.
+ # TODO(mariagh): Move to class _Bundle's inner _StackedWindowedValues
+ if input_bundle.has_elements():
+ self._latest_main_input_window = input_bundle._elements[0].windows[0]
+ for elem in input_bundle.get_elements_iterable():
+ if elem.windows[0].end > self._latest_main_input_window.end:
+ self._latest_main_input_window = elem.windows[0]
self._fired_timers = fired_timers
self._applied_ptransform = applied_ptransform
self._completion_callback = completion_callback
@@ -288,11 +296,16 @@ class TransformExecutor(_ExecutorService.CallableTask):
scoped_metrics_container = ScopedMetricsContainer(metrics_container)
for side_input in self._applied_ptransform.side_inputs:
+ # Find the projection of main's window onto the side input's window.
+ window_mapping_fn = side_input._view_options().get(
+ 'window_mapping_fn', sideinputs._global_window_mapping_fn)
+ main_onto_side_window = window_mapping_fn(self._latest_main_input_window)
+ block_until = main_onto_side_window.end
+
if side_input not in self._side_input_values:
- has_result, value = (
- self._evaluation_context.get_value_or_schedule_after_output(
- side_input, self))
- if not has_result:
+ value = self._evaluation_context.get_value_or_block_until_ready(
+ side_input, self, block_until)
+ if not value:
# Monitor task will reschedule this executor once the side input is
# available.
return
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py
b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 74a0216..7128ada 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -144,8 +144,10 @@ class WatermarkManager(object):
for consumer in consumers:
unblocked_tasks.extend(
self._refresh_watermarks(consumer, side_inputs_container))
+ # Notify the side_inputs_container.
unblocked_tasks.extend(
- side_inputs_container.update_watermarks_for_transform(
+ side_inputs_container
+ .update_watermarks_for_transform_and_unblock_tasks(
applied_ptransform, tw))
return unblocked_tasks
diff --git a/sdks/python/apache_beam/testing/test_stream_test.py
b/sdks/python/apache_beam/testing/test_stream_test.py
index 3beeb4b..826ef8e 100644
--- a/sdks/python/apache_beam/testing/test_stream_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -247,7 +247,7 @@ class TestStreamTest(unittest.TestCase):
# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
self.assertEqual([('k', ['a'])], result)
- def test_basic_execution_sideinputs_batch(self):
+ def test_basic_execution_batch_sideinputs(self):
# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
global result # pylint: disable=global-variable-undefined
@@ -301,13 +301,14 @@ class TestStreamTest(unittest.TestCase):
main_stream = (p
| 'main TestStream' >> TestStream()
.advance_watermark_to(10)
- .add_elements(['e'])
- .advance_processing_time(11))
+ .add_elements(['e']))
side_stream = (p
| 'side TestStream' >> TestStream()
.add_elements([window.TimestampedValue(2, 2)])
.add_elements([window.TimestampedValue(1, 1)])
- .add_elements([window.TimestampedValue(4, 4)]))
+ .add_elements([window.TimestampedValue(7, 7)])
+ .add_elements([window.TimestampedValue(4, 4)])
+ )
class RecordFn(beam.DoFn):
def process(self,
@@ -323,7 +324,103 @@ class TestStreamTest(unittest.TestCase):
p.run()
# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
- self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result)
+ self.assertEqual([('e', Timestamp(10), [2, 1, 7, 4])], result)
+
+ def test_basic_execution_batch_sideinputs_fixed_windows(self):
+
+ # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+ global result # pylint: disable=global-variable-undefined
+ result = []
+
+ def recorded_elements(elem):
+ result.append(elem)
+ return elem
+
+ options = PipelineOptions()
+ options.view_as(StandardOptions).streaming = True
+ p = TestPipeline(options=options)
+
+ main_stream = (p
+ | 'main TestStream' >> TestStream()
+ .advance_watermark_to(2)
+ .add_elements(['a'])
+ .advance_watermark_to(4)
+ .add_elements(['b'])
+ | 'main window' >> beam.WindowInto(window.FixedWindows(1)))
+ side = (p
+ | beam.Create([2, 1, 4])
+ | beam.Map(lambda t: window.TimestampedValue(t, t))
+ | beam.WindowInto(window.FixedWindows(2)))
+
+ class RecordFn(beam.DoFn):
+ def process(self,
+ elm=beam.DoFn.ElementParam,
+ ts=beam.DoFn.TimestampParam,
+ side=beam.DoFn.SideInputParam):
+ yield (elm, ts, side)
+
+ records = (main_stream # pylint: disable=unused-variable
+ | beam.ParDo(RecordFn(), beam.pvalue.AsList(side))
+ | beam.Map(recorded_elements))
+ p.run()
+
+ # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+ self.assertEqual([('a', Timestamp(2), [2]),
+ ('b', Timestamp(4), [4])], result)
+
+ def test_basic_execution_sideinputs_fixed_windows(self):
+
+ # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+ global result # pylint: disable=global-variable-undefined
+ result = []
+
+ def recorded_elements(elem):
+ result.append(elem)
+ return elem
+
+ options = PipelineOptions()
+ options.view_as(StandardOptions).streaming = True
+ p = TestPipeline(options=options)
+
+ main_stream = (p
+ | 'main TestStream' >> TestStream()
+ .advance_watermark_to(9)
+ .add_elements(['a1', 'a2', 'a3', 'a4'])
+ .add_elements(['b'])
+ .advance_watermark_to(18)
+ .add_elements('c')
+ | 'main windowInto' >> beam.WindowInto(
+ window.FixedWindows(1))
+ )
+ side_stream = (p
+ | 'side TestStream' >> TestStream()
+ .advance_watermark_to(12)
+ .add_elements([window.TimestampedValue('s1', 10)])
+ .advance_watermark_to(20)
+ .add_elements([window.TimestampedValue('s2', 20)])
+ | 'side windowInto' >> beam.WindowInto(
+ window.FixedWindows(3))
+ )
+
+ class RecordFn(beam.DoFn):
+ def process(self,
+ elm=beam.DoFn.ElementParam,
+ ts=beam.DoFn.TimestampParam,
+ side=beam.DoFn.SideInputParam):
+ yield (elm, ts, side)
+
+ records = (main_stream # pylint: disable=unused-variable
+ | beam.ParDo(RecordFn(), beam.pvalue.AsList(side_stream))
+ | beam.Map(recorded_elements))
+ p.run()
+
+ # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+ self.assertEqual([('a1', Timestamp(9), ['s1']),
+ ('a2', Timestamp(9), ['s1']),
+ ('a3', Timestamp(9), ['s1']),
+ ('a4', Timestamp(9), ['s1']),
+ ('b', Timestamp(9), ['s1']),
+ ('c', Timestamp(18), ['s2'])], result)
if __name__ == '__main__':
--
To stop receiving notification emails like this one, please contact
[email protected].