[
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=86706&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86706
]
ASF GitHub Bot logged work on BEAM-3818:
----------------------------------------
Author: ASF GitHub Bot
Created on: 02/Apr/18 19:08
Start Date: 02/Apr/18 19:08
Worklog Time Spent: 10m
Work Description: aaltay closed pull request #4949: [BEAM-3818] Add
support for streaming side inputs in the DirectRunner (part 2: unblock tasks as
the _SideInputsContainer gets updated)
URL: https://github.com/apache/beam/pull/4949
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py
b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index d0ab55f5462..893e32e3357 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -51,10 +51,10 @@ class _SideInputView(object):
def __init__(self, view):
self._view = view
- self.callable_queue = collections.deque()
+ self.blocked_tasks = collections.deque()
self.elements = []
self.value = None
- self.has_result = False
+ self.watermark = None
def __repr__(self):
elements_string = (', '.join(str(elm) for elm in self.elements)
@@ -69,67 +69,108 @@ class _SideInputsContainer(object):
to a side input.
"""
- def __init__(self, views):
+ def __init__(self, side_inputs):
self._lock = threading.Lock()
self._views = {}
- self._transform_to_views = collections.defaultdict(list)
+ self._transform_to_side_inputs = collections.defaultdict(list)
+ self._side_input_to_blocked_tasks = collections.defaultdict(list)
- for view in views:
- self._views[view] = _SideInputView(view)
- self._transform_to_views[view.pvalue.producer].append(view)
+ for side in side_inputs:
+ self._views[side] = _SideInputView(side)
+ self._transform_to_side_inputs[side.pvalue.producer].append(side)
def __repr__(self):
views_string = (', '.join(str(elm) for elm in self._views.values())
if self._views.values() else '[]')
return '_SideInputsContainer(_views=%s)' % views_string
- def get_value_or_schedule_after_output(self, side_input, task):
+ def get_value_or_block_until_ready(self, side_input, task, block_until):
+ """Returns the value of a view whose task is unblocked or blocks its task.
+
+ It gets the value of a view whose watermark has been updated and
+ surpasses a given value.
+
+ Args:
+ side_input: ``_UnpickledSideInput`` value.
+ task: ``TransformExecutor`` task waiting on a side input.
+ block_until: Timestamp after which the task gets unblocked.
+
+ Returns:
+ The ``SideInputMap`` value of a view when the tasks it blocks are
+ unblocked. Otherwise, None.
+ """
with self._lock:
view = self._views[side_input]
- if not view.has_result:
- view.callable_queue.append(task)
+ if view.watermark and view.watermark.output_watermark >= block_until:
+ view.value = self._pvalue_to_value(side_input, view.elements)
+ return view.value
+ else:
+ view.blocked_tasks.append((task, block_until))
task.blocked = True
- return (view.has_result, view.value)
def add_values(self, side_input, values):
with self._lock:
view = self._views[side_input]
- assert not view.has_result
view.elements.extend(values)
- def finalize_value_and_get_tasks(self, side_input):
- with self._lock:
- view = self._views[side_input]
- assert not view.has_result
- assert view.value is None
- assert view.callable_queue is not None
- view.value = self._pvalue_to_value(side_input, view.elements)
- view.elements = None
- result = tuple(view.callable_queue)
- for task in result:
- task.blocked = False
- view.callable_queue = None
- view.has_result = True
- return result
-
- def update_watermarks_for_transform(self, ptransform, watermark):
- # Collect tasks that get unblocked as the workflow progresses.
- unblocked_tasks = []
- for view in self._transform_to_views[ptransform]:
- unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
- return unblocked_tasks
+ def update_watermarks_for_transform_and_unblock_tasks(self,
+ ptransform,
+ watermark):
+ """Updates _SideInputsContainer after a watermark update and unbloks tasks.
+
+ It traverses the list of side inputs per PTransform and calls
+ _update_watermarks_for_side_input_and_unblock_tasks to unblock tasks.
- def _update_watermarks_for_view(self, view, watermark):
+ Args:
+ ptransform: Value of a PTransform.
+ watermark: Value of the watermark after an update for a PTransform.
+
+ Returns:
+ Tasks that get unblocked as a result of the watermark advancing.
+ """
unblocked_tasks = []
- if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
- unblocked_tasks = self.finalize_value_and_get_tasks(view)
+ for side in self._transform_to_side_inputs[ptransform]:
+ unblocked_tasks.extend(
+ self._update_watermarks_for_side_input_and_unblock_tasks(
+ side, watermark))
return unblocked_tasks
- def _pvalue_to_value(self, view, values):
- """Given a side input view, returns the associated value in requested form.
+ def _update_watermarks_for_side_input_and_unblock_tasks(self,
+ side_input,
+ watermark):
+ """Helps update _SideInputsContainer after a watermark update.
+
+ For each view of the side input, it updates the value of the watermark
+ recorded when the watermark moved and unblocks tasks accordingly.
+
+ Args:
+ side_input: ``_UnpickledSideInput`` value.
+ watermark: Value of the watermark after an update for a PTransform.
+
+ Returns:
+ Tasks that get unblocked as a result of the watermark advancing.
+ """
+ with self._lock:
+ view = self._views[side_input]
+ view.watermark = watermark
+
+ unblocked_tasks = []
+ tasks_just_unblocked = []
+ for task, block_until in view.blocked_tasks:
+ if watermark.output_watermark >= block_until:
+ view.value = self._pvalue_to_value(side_input, view.elements)
+ unblocked_tasks.append(task)
+ tasks_just_unblocked.append((task, block_until))
+ task.blocked = False
+ for task in tasks_just_unblocked:
+ view.blocked_tasks.remove(task)
+ return unblocked_tasks
+
+ def _pvalue_to_value(self, side_input, values):
+ """Given a side input, returns the associated value in its requested form.
Args:
- view: SideInput for the requested side input.
+ side_input: _UnpickledSideInput object.
values: Iterable values associated with the side input.
Returns:
@@ -138,7 +179,9 @@ def _pvalue_to_value(self, view, values):
Raises:
ValueError: If values cannot be converted into the requested form.
"""
- return sideinputs.SideInputMap(type(view), view._view_options(), values)
+ return sideinputs.SideInputMap(type(side_input),
+ side_input._view_options(),
+ values)
class EvaluationContext(object):
@@ -321,10 +364,10 @@ def _is_transform_done(self, transform):
tw = self._watermark_manager.get_watermarks(transform)
return tw.output_watermark == WatermarkManager.WATERMARK_POS_INF
- def get_value_or_schedule_after_output(self, side_input, task):
+ def get_value_or_block_until_ready(self, side_input, task, block_until):
assert isinstance(task, TransformExecutor)
- return self._side_inputs_container.get_value_or_schedule_after_output(
- side_input, task)
+ return self._side_inputs_container.get_value_or_block_until_ready(
+ side_input, task, block_until)
class DirectUnmergedState(InMemoryUnmergedState):
diff --git a/sdks/python/apache_beam/runners/direct/executor.py
b/sdks/python/apache_beam/runners/direct/executor.py
index 5c9fe841446..107f00764a2 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -32,6 +32,7 @@
from apache_beam.metrics.execution import MetricsContainer
from apache_beam.metrics.execution import ScopedMetricsContainer
+from apache_beam.transforms import sideinputs
class _ExecutorService(object):
@@ -271,6 +272,13 @@ def __init__(self, transform_evaluator_registry,
evaluation_context,
self._transform_evaluator_registry = transform_evaluator_registry
self._evaluation_context = evaluation_context
self._input_bundle = input_bundle
+ # For non-empty bundles, store the window of the max EOW.
+ # TODO(mariagh): Move to class _Bundle's inner _StackedWindowedValues
+ if input_bundle.has_elements():
+ self._latest_main_input_window = input_bundle._elements[0].windows[0]
+ for elem in input_bundle.get_elements_iterable():
+ if elem.windows[0].end > self._latest_main_input_window.end:
+ self._latest_main_input_window = elem.windows[0]
self._fired_timers = fired_timers
self._applied_ptransform = applied_ptransform
self._completion_callback = completion_callback
@@ -288,11 +296,16 @@ def call(self):
scoped_metrics_container = ScopedMetricsContainer(metrics_container)
for side_input in self._applied_ptransform.side_inputs:
+ # Find the projection of main's window onto the side input's window.
+ window_mapping_fn = side_input._view_options().get(
+ 'window_mapping_fn', sideinputs._global_window_mapping_fn)
+ main_onto_side_window = window_mapping_fn(self._latest_main_input_window)
+ block_until = main_onto_side_window.end
+
if side_input not in self._side_input_values:
- has_result, value = (
- self._evaluation_context.get_value_or_schedule_after_output(
- side_input, self))
- if not has_result:
+ value = self._evaluation_context.get_value_or_block_until_ready(
+ side_input, self, block_until)
+ if not value:
# Monitor task will reschedule this executor once the side input is
# available.
return
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py
b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 74a021674f8..7128ada7871 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -144,8 +144,10 @@ def _refresh_watermarks(self, applied_ptransform,
side_inputs_container):
for consumer in consumers:
unblocked_tasks.extend(
self._refresh_watermarks(consumer, side_inputs_container))
+ # Notify the side_inputs_container.
unblocked_tasks.extend(
- side_inputs_container.update_watermarks_for_transform(
+ side_inputs_container
+ .update_watermarks_for_transform_and_unblock_tasks(
applied_ptransform, tw))
return unblocked_tasks
diff --git a/sdks/python/apache_beam/testing/test_stream_test.py
b/sdks/python/apache_beam/testing/test_stream_test.py
index 3beeb4b2825..826ef8e0a48 100644
--- a/sdks/python/apache_beam/testing/test_stream_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -247,7 +247,7 @@ def fired_elements(elem):
# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
self.assertEqual([('k', ['a'])], result)
- def test_basic_execution_sideinputs_batch(self):
+ def test_basic_execution_batch_sideinputs(self):
# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
global result # pylint: disable=global-variable-undefined
@@ -301,13 +301,14 @@ def recorded_elements(elem):
main_stream = (p
| 'main TestStream' >> TestStream()
.advance_watermark_to(10)
- .add_elements(['e'])
- .advance_processing_time(11))
+ .add_elements(['e']))
side_stream = (p
| 'side TestStream' >> TestStream()
.add_elements([window.TimestampedValue(2, 2)])
.add_elements([window.TimestampedValue(1, 1)])
- .add_elements([window.TimestampedValue(4, 4)]))
+ .add_elements([window.TimestampedValue(7, 7)])
+ .add_elements([window.TimestampedValue(4, 4)])
+ )
class RecordFn(beam.DoFn):
def process(self,
@@ -323,7 +324,103 @@ def process(self,
p.run()
# TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
- self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result)
+ self.assertEqual([('e', Timestamp(10), [2, 1, 7, 4])], result)
+
+ def test_basic_execution_batch_sideinputs_fixed_windows(self):
+
+ # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+ global result # pylint: disable=global-variable-undefined
+ result = []
+
+ def recorded_elements(elem):
+ result.append(elem)
+ return elem
+
+ options = PipelineOptions()
+ options.view_as(StandardOptions).streaming = True
+ p = TestPipeline(options=options)
+
+ main_stream = (p
+ | 'main TestStream' >> TestStream()
+ .advance_watermark_to(2)
+ .add_elements(['a'])
+ .advance_watermark_to(4)
+ .add_elements(['b'])
+ | 'main window' >> beam.WindowInto(window.FixedWindows(1)))
+ side = (p
+ | beam.Create([2, 1, 4])
+ | beam.Map(lambda t: window.TimestampedValue(t, t))
+ | beam.WindowInto(window.FixedWindows(2)))
+
+ class RecordFn(beam.DoFn):
+ def process(self,
+ elm=beam.DoFn.ElementParam,
+ ts=beam.DoFn.TimestampParam,
+ side=beam.DoFn.SideInputParam):
+ yield (elm, ts, side)
+
+ records = (main_stream # pylint: disable=unused-variable
+ | beam.ParDo(RecordFn(), beam.pvalue.AsList(side))
+ | beam.Map(recorded_elements))
+ p.run()
+
+ # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+ self.assertEqual([('a', Timestamp(2), [2]),
+ ('b', Timestamp(4), [4])], result)
+
+ def test_basic_execution_sideinputs_fixed_windows(self):
+
+ # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+ global result # pylint: disable=global-variable-undefined
+ result = []
+
+ def recorded_elements(elem):
+ result.append(elem)
+ return elem
+
+ options = PipelineOptions()
+ options.view_as(StandardOptions).streaming = True
+ p = TestPipeline(options=options)
+
+ main_stream = (p
+ | 'main TestStream' >> TestStream()
+ .advance_watermark_to(9)
+ .add_elements(['a1', 'a2', 'a3', 'a4'])
+ .add_elements(['b'])
+ .advance_watermark_to(18)
+ .add_elements('c')
+ | 'main windowInto' >> beam.WindowInto(
+ window.FixedWindows(1))
+ )
+ side_stream = (p
+ | 'side TestStream' >> TestStream()
+ .advance_watermark_to(12)
+ .add_elements([window.TimestampedValue('s1', 10)])
+ .advance_watermark_to(20)
+ .add_elements([window.TimestampedValue('s2', 20)])
+ | 'side windowInto' >> beam.WindowInto(
+ window.FixedWindows(3))
+ )
+
+ class RecordFn(beam.DoFn):
+ def process(self,
+ elm=beam.DoFn.ElementParam,
+ ts=beam.DoFn.TimestampParam,
+ side=beam.DoFn.SideInputParam):
+ yield (elm, ts, side)
+
+ records = (main_stream # pylint: disable=unused-variable
+ | beam.ParDo(RecordFn(), beam.pvalue.AsList(side_stream))
+ | beam.Map(recorded_elements))
+ p.run()
+
+ # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+ self.assertEqual([('a1', Timestamp(9), ['s1']),
+ ('a2', Timestamp(9), ['s1']),
+ ('a3', Timestamp(9), ['s1']),
+ ('a4', Timestamp(9), ['s1']),
+ ('b', Timestamp(9), ['s1']),
+ ('c', Timestamp(18), ['s2'])], result)
if __name__ == '__main__':
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 86706)
Time Spent: 11h 10m (was: 11h)
> Add support for the streaming side inputs in the Python DirectRunner
> --------------------------------------------------------------------
>
> Key: BEAM-3818
> URL: https://issues.apache.org/jira/browse/BEAM-3818
> Project: Beam
> Issue Type: New Feature
> Components: sdk-py-core
> Reporter: María GH
> Assignee: María GH
> Priority: Minor
> Fix For: 3.0.0
>
> Time Spent: 11h 10m
> Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.
> Currently, side inputs are only available for globally-windowed side input
> PCollections.
> Also, empty side inputs cause a pipeline stall.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)