This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fdbd7fa  [BEAM-3818] Add support for streaming side inputs in the 
DirectRunner (part 2: unblock tasks as the _SideInputsContainer gets updated)  
(#4949)
fdbd7fa is described below

commit fdbd7fafb3a8c26fde340494642f500fc3893f60
Author: mariapython <[email protected]>
AuthorDate: Mon Apr 2 12:08:07 2018 -0700

    [BEAM-3818] Add support for streaming side inputs in the DirectRunner (part 
2: unblock tasks as the _SideInputsContainer gets updated)  (#4949)
    
    Add support for streaming side inputs in the DirectRunner (part 2: unblock 
tasks as the _SideInputsContainer gets updated)
---
 .../runners/direct/evaluation_context.py           | 129 ++++++++++++++-------
 sdks/python/apache_beam/runners/direct/executor.py |  21 +++-
 .../runners/direct/watermark_manager.py            |   4 +-
 .../python/apache_beam/testing/test_stream_test.py | 107 ++++++++++++++++-
 4 files changed, 208 insertions(+), 53 deletions(-)

diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py 
b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index d0ab55f..893e32e 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -51,10 +51,10 @@ class _SideInputView(object):
 
   def __init__(self, view):
     self._view = view
-    self.callable_queue = collections.deque()
+    self.blocked_tasks = collections.deque()
     self.elements = []
     self.value = None
-    self.has_result = False
+    self.watermark = None
 
   def __repr__(self):
     elements_string = (', '.join(str(elm) for elm in self.elements)
@@ -69,67 +69,108 @@ class _SideInputsContainer(object):
   to a side input.
   """
 
-  def __init__(self, views):
+  def __init__(self, side_inputs):
     self._lock = threading.Lock()
     self._views = {}
-    self._transform_to_views = collections.defaultdict(list)
+    self._transform_to_side_inputs = collections.defaultdict(list)
+    self._side_input_to_blocked_tasks = collections.defaultdict(list)
 
-    for view in views:
-      self._views[view] = _SideInputView(view)
-      self._transform_to_views[view.pvalue.producer].append(view)
+    for side in side_inputs:
+      self._views[side] = _SideInputView(side)
+      self._transform_to_side_inputs[side.pvalue.producer].append(side)
 
   def __repr__(self):
     views_string = (', '.join(str(elm) for elm in self._views.values())
                     if self._views.values() else '[]')
     return '_SideInputsContainer(_views=%s)' % views_string
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
+    """Returns the value of a view whose task is unblocked or blocks its task.
+
+    It gets the value of a view whose watermark has been updated and
+    surpasses a given value.
+
+    Args:
+      side_input: ``_UnpickledSideInput`` value.
+      task: ``TransformExecutor`` task waiting on a side input.
+      block_until: Timestamp after which the task gets unblocked.
+
+    Returns:
+      The ``SideInputMap`` value of a view when the tasks it blocks are
+      unblocked. Otherwise, None.
+    """
     with self._lock:
       view = self._views[side_input]
-      if not view.has_result:
-        view.callable_queue.append(task)
+      if view.watermark and view.watermark.output_watermark >= block_until:
+        view.value = self._pvalue_to_value(side_input, view.elements)
+        return view.value
+      else:
+        view.blocked_tasks.append((task, block_until))
         task.blocked = True
-      return (view.has_result, view.value)
 
   def add_values(self, side_input, values):
     with self._lock:
       view = self._views[side_input]
-      assert not view.has_result
       view.elements.extend(values)
 
-  def finalize_value_and_get_tasks(self, side_input):
-    with self._lock:
-      view = self._views[side_input]
-      assert not view.has_result
-      assert view.value is None
-      assert view.callable_queue is not None
-      view.value = self._pvalue_to_value(side_input, view.elements)
-      view.elements = None
-      result = tuple(view.callable_queue)
-      for task in result:
-        task.blocked = False
-      view.callable_queue = None
-      view.has_result = True
-      return result
-
-  def update_watermarks_for_transform(self, ptransform, watermark):
-    # Collect tasks that get unblocked as the workflow progresses.
-    unblocked_tasks = []
-    for view in self._transform_to_views[ptransform]:
-      unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
-    return unblocked_tasks
+  def update_watermarks_for_transform_and_unblock_tasks(self,
+                                                        ptransform,
+                                                        watermark):
+    """Updates _SideInputsContainer after a watermark update and unbloks tasks.
+
+    It traverses the list of side inputs per PTransform and calls
+    _update_watermarks_for_side_input_and_unblock_tasks to unblock tasks.
 
-  def _update_watermarks_for_view(self, view, watermark):
+    Args:
+      ptransform: Value of a PTransform.
+      watermark: Value of the watermark after an update for a PTransform.
+
+    Returns:
+      Tasks that get unblocked as a result of the watermark advancing.
+    """
     unblocked_tasks = []
-    if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
-      unblocked_tasks = self.finalize_value_and_get_tasks(view)
+    for side in self._transform_to_side_inputs[ptransform]:
+      unblocked_tasks.extend(
+          self._update_watermarks_for_side_input_and_unblock_tasks(
+              side, watermark))
     return unblocked_tasks
 
-  def _pvalue_to_value(self, view, values):
-    """Given a side input view, returns the associated value in requested form.
+  def _update_watermarks_for_side_input_and_unblock_tasks(self,
+                                                          side_input,
+                                                          watermark):
+    """Helps update _SideInputsContainer after a watermark update.
+
+    For each view of the side input, it updates the value of the watermark
+    recorded when the watermark moved and unblocks tasks accordingly.
+
+    Args:
+      side_input: ``_UnpickledSideInput`` value.
+      watermark: Value of the watermark after an update for a PTransform.
+
+    Returns:
+      Tasks that get unblocked as a result of the watermark advancing.
+    """
+    with self._lock:
+      view = self._views[side_input]
+      view.watermark = watermark
+
+      unblocked_tasks = []
+      tasks_just_unblocked = []
+      for task, block_until in view.blocked_tasks:
+        if watermark.output_watermark >= block_until:
+          view.value = self._pvalue_to_value(side_input, view.elements)
+          unblocked_tasks.append(task)
+          tasks_just_unblocked.append((task, block_until))
+          task.blocked = False
+      for task in tasks_just_unblocked:
+        view.blocked_tasks.remove(task)
+      return unblocked_tasks
+
+  def _pvalue_to_value(self, side_input, values):
+    """Given a side input, returns the associated value in its requested form.
 
     Args:
-      view: SideInput for the requested side input.
+      side_input: _UnpickledSideInput object.
       values: Iterable values associated with the side input.
 
     Returns:
@@ -138,7 +179,9 @@ class _SideInputsContainer(object):
     Raises:
       ValueError: If values cannot be converted into the requested form.
     """
-    return sideinputs.SideInputMap(type(view), view._view_options(), values)
+    return sideinputs.SideInputMap(type(side_input),
+                                   side_input._view_options(),
+                                   values)
 
 
 class EvaluationContext(object):
@@ -321,10 +364,10 @@ class EvaluationContext(object):
     tw = self._watermark_manager.get_watermarks(transform)
     return tw.output_watermark == WatermarkManager.WATERMARK_POS_INF
 
-  def get_value_or_schedule_after_output(self, side_input, task):
+  def get_value_or_block_until_ready(self, side_input, task, block_until):
     assert isinstance(task, TransformExecutor)
-    return self._side_inputs_container.get_value_or_schedule_after_output(
-        side_input, task)
+    return self._side_inputs_container.get_value_or_block_until_ready(
+        side_input, task, block_until)
 
 
 class DirectUnmergedState(InMemoryUnmergedState):
diff --git a/sdks/python/apache_beam/runners/direct/executor.py 
b/sdks/python/apache_beam/runners/direct/executor.py
index 5c9fe84..107f007 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -32,6 +32,7 @@ import six
 
 from apache_beam.metrics.execution import MetricsContainer
 from apache_beam.metrics.execution import ScopedMetricsContainer
+from apache_beam.transforms import sideinputs
 
 
 class _ExecutorService(object):
@@ -271,6 +272,13 @@ class TransformExecutor(_ExecutorService.CallableTask):
     self._transform_evaluator_registry = transform_evaluator_registry
     self._evaluation_context = evaluation_context
     self._input_bundle = input_bundle
+    # For non-empty bundles, store the window of the max EOW.
+    # TODO(mariagh): Move to class _Bundle's inner _StackedWindowedValues
+    if input_bundle.has_elements():
+      self._latest_main_input_window = input_bundle._elements[0].windows[0]
+      for elem in input_bundle.get_elements_iterable():
+        if elem.windows[0].end > self._latest_main_input_window.end:
+          self._latest_main_input_window = elem.windows[0]
     self._fired_timers = fired_timers
     self._applied_ptransform = applied_ptransform
     self._completion_callback = completion_callback
@@ -288,11 +296,16 @@ class TransformExecutor(_ExecutorService.CallableTask):
     scoped_metrics_container = ScopedMetricsContainer(metrics_container)
 
     for side_input in self._applied_ptransform.side_inputs:
+      # Find the projection of main's window onto the side input's window.
+      window_mapping_fn = side_input._view_options().get(
+          'window_mapping_fn', sideinputs._global_window_mapping_fn)
+      main_onto_side_window = window_mapping_fn(self._latest_main_input_window)
+      block_until = main_onto_side_window.end
+
       if side_input not in self._side_input_values:
-        has_result, value = (
-            self._evaluation_context.get_value_or_schedule_after_output(
-                side_input, self))
-        if not has_result:
+        value = self._evaluation_context.get_value_or_block_until_ready(
+            side_input, self, block_until)
+        if not value:
           # Monitor task will reschedule this executor once the side input is
           # available.
           return
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py 
b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 74a0216..7128ada 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -144,8 +144,10 @@ class WatermarkManager(object):
             for consumer in consumers:
               unblocked_tasks.extend(
                   self._refresh_watermarks(consumer, side_inputs_container))
+      # Notify the side_inputs_container.
       unblocked_tasks.extend(
-          side_inputs_container.update_watermarks_for_transform(
+          side_inputs_container
+          .update_watermarks_for_transform_and_unblock_tasks(
               applied_ptransform, tw))
     return unblocked_tasks
 
diff --git a/sdks/python/apache_beam/testing/test_stream_test.py 
b/sdks/python/apache_beam/testing/test_stream_test.py
index 3beeb4b..826ef8e 100644
--- a/sdks/python/apache_beam/testing/test_stream_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -247,7 +247,7 @@ class TestStreamTest(unittest.TestCase):
     # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
     self.assertEqual([('k', ['a'])], result)
 
-  def test_basic_execution_sideinputs_batch(self):
+  def test_basic_execution_batch_sideinputs(self):
 
     # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
     global result     # pylint: disable=global-variable-undefined
@@ -301,13 +301,14 @@ class TestStreamTest(unittest.TestCase):
     main_stream = (p
                    | 'main TestStream' >> TestStream()
                    .advance_watermark_to(10)
-                   .add_elements(['e'])
-                   .advance_processing_time(11))
+                   .add_elements(['e']))
     side_stream = (p
                    | 'side TestStream' >> TestStream()
                    .add_elements([window.TimestampedValue(2, 2)])
                    .add_elements([window.TimestampedValue(1, 1)])
-                   .add_elements([window.TimestampedValue(4, 4)]))
+                   .add_elements([window.TimestampedValue(7, 7)])
+                   .add_elements([window.TimestampedValue(4, 4)])
+                  )
 
     class RecordFn(beam.DoFn):
       def process(self,
@@ -323,7 +324,103 @@ class TestStreamTest(unittest.TestCase):
     p.run()
 
     # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
-    self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result)
+    self.assertEqual([('e', Timestamp(10), [2, 1, 7, 4])], result)
+
+  def test_basic_execution_batch_sideinputs_fixed_windows(self):
+
+    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+    global result     # pylint: disable=global-variable-undefined
+    result = []
+
+    def recorded_elements(elem):
+      result.append(elem)
+      return elem
+
+    options = PipelineOptions()
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
+
+    main_stream = (p
+                   | 'main TestStream' >> TestStream()
+                   .advance_watermark_to(2)
+                   .add_elements(['a'])
+                   .advance_watermark_to(4)
+                   .add_elements(['b'])
+                   | 'main window' >> beam.WindowInto(window.FixedWindows(1)))
+    side = (p
+            | beam.Create([2, 1, 4])
+            | beam.Map(lambda t: window.TimestampedValue(t, t))
+            | beam.WindowInto(window.FixedWindows(2)))
+
+    class RecordFn(beam.DoFn):
+      def process(self,
+                  elm=beam.DoFn.ElementParam,
+                  ts=beam.DoFn.TimestampParam,
+                  side=beam.DoFn.SideInputParam):
+        yield (elm, ts, side)
+
+    records = (main_stream     # pylint: disable=unused-variable
+               | beam.ParDo(RecordFn(), beam.pvalue.AsList(side))
+               | beam.Map(recorded_elements))
+    p.run()
+
+    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+    self.assertEqual([('a', Timestamp(2), [2]),
+                      ('b', Timestamp(4), [4])], result)
+
+  def test_basic_execution_sideinputs_fixed_windows(self):
+
+    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+    global result     # pylint: disable=global-variable-undefined
+    result = []
+
+    def recorded_elements(elem):
+      result.append(elem)
+      return elem
+
+    options = PipelineOptions()
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
+
+    main_stream = (p
+                   | 'main TestStream' >> TestStream()
+                   .advance_watermark_to(9)
+                   .add_elements(['a1', 'a2', 'a3', 'a4'])
+                   .add_elements(['b'])
+                   .advance_watermark_to(18)
+                   .add_elements('c')
+                   | 'main windowInto' >> beam.WindowInto(
+                       window.FixedWindows(1))
+                  )
+    side_stream = (p
+                   | 'side TestStream' >> TestStream()
+                   .advance_watermark_to(12)
+                   .add_elements([window.TimestampedValue('s1', 10)])
+                   .advance_watermark_to(20)
+                   .add_elements([window.TimestampedValue('s2', 20)])
+                   | 'side windowInto' >> beam.WindowInto(
+                       window.FixedWindows(3))
+                  )
+
+    class RecordFn(beam.DoFn):
+      def process(self,
+                  elm=beam.DoFn.ElementParam,
+                  ts=beam.DoFn.TimestampParam,
+                  side=beam.DoFn.SideInputParam):
+        yield (elm, ts, side)
+
+    records = (main_stream     # pylint: disable=unused-variable
+               | beam.ParDo(RecordFn(), beam.pvalue.AsList(side_stream))
+               | beam.Map(recorded_elements))
+    p.run()
+
+    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+    self.assertEqual([('a1', Timestamp(9), ['s1']),
+                      ('a2', Timestamp(9), ['s1']),
+                      ('a3', Timestamp(9), ['s1']),
+                      ('a4', Timestamp(9), ['s1']),
+                      ('b', Timestamp(9), ['s1']),
+                      ('c', Timestamp(18), ['s2'])], result)
 
 
 if __name__ == '__main__':

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to