[ 
https://issues.apache.org/jira/browse/BEAM-3818?focusedWorklogId=81917&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81917
 ]

ASF GitHub Bot logged work on BEAM-3818:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Mar/18 16:51
            Start Date: 19/Mar/18 16:51
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #4838: [BEAM-3818] Add 
support for streaming side inputs in the DirectRunner (part I: update 
_SideInputsContainer as the watermark advances)
URL: https://github.com/apache/beam/pull/4838
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/direct/evaluation_context.py 
b/sdks/python/apache_beam/runners/direct/evaluation_context.py
index 46176c9e969..d0ab55f5462 100644
--- a/sdks/python/apache_beam/runners/direct/evaluation_context.py
+++ b/sdks/python/apache_beam/runners/direct/evaluation_context.py
@@ -56,6 +56,11 @@ def __init__(self, view):
     self.value = None
     self.has_result = False
 
+  def __repr__(self):
+    elements_string = (', '.join(str(elm) for elm in self.elements)
+                       if self.elements else '[]')
+    return '_SideInputView(elements=%s)' % elements_string
+
 
 class _SideInputsContainer(object):
   """An in-process container for side inputs.
@@ -67,8 +72,16 @@ class _SideInputsContainer(object):
   def __init__(self, views):
     self._lock = threading.Lock()
     self._views = {}
+    self._transform_to_views = collections.defaultdict(list)
+
     for view in views:
       self._views[view] = _SideInputView(view)
+      self._transform_to_views[view.pvalue.producer].append(view)
+
+  def __repr__(self):
+    views_string = (', '.join(str(elm) for elm in self._views.values())
+                    if self._views.values() else '[]')
+    return '_SideInputsContainer(_views=%s)' % views_string
 
   def get_value_or_schedule_after_output(self, side_input, task):
     with self._lock:
@@ -99,6 +112,19 @@ def finalize_value_and_get_tasks(self, side_input):
       view.has_result = True
       return result
 
+  def update_watermarks_for_transform(self, ptransform, watermark):
+    # Collect tasks that get unblocked as the workflow progresses.
+    unblocked_tasks = []
+    for view in self._transform_to_views[ptransform]:
+      unblocked_tasks.extend(self._update_watermarks_for_view(view, watermark))
+    return unblocked_tasks
+
+  def _update_watermarks_for_view(self, view, watermark):
+    unblocked_tasks = []
+    if watermark.input_watermark == WatermarkManager.WATERMARK_POS_INF:
+      unblocked_tasks = self.finalize_value_and_get_tasks(view)
+    return unblocked_tasks
+
   def _pvalue_to_value(self, view, values):
     """Given a side input view, returns the associated value in requested form.
 
@@ -149,10 +175,10 @@ def __init__(self, pipeline_options, bundle_factory, 
root_transforms,
       self._pcollection_to_views[view.pvalue].append(view)
     self._transform_keyed_states = self._initialize_keyed_states(
         root_transforms, value_to_consumers)
+    self._side_inputs_container = _SideInputsContainer(views)
     self._watermark_manager = WatermarkManager(
         clock, root_transforms, value_to_consumers,
         self._transform_keyed_states)
-    self._side_inputs_container = _SideInputsContainer(views)
     self._pending_unblocked_tasks = []
     self._counter_factory = counters.CounterFactory()
     self._metrics = DirectMetrics()
@@ -199,9 +225,6 @@ def handle_result(
       committed_bundles, unprocessed_bundles = self._commit_bundles(
           result.uncommitted_output_bundles,
           result.unprocessed_bundles)
-      self._watermark_manager.update_watermarks(
-          completed_bundle, result.transform, completed_timers,
-          committed_bundles, unprocessed_bundles, result.keyed_watermark_holds)
 
       self._metrics.commit_logical(completed_bundle,
                                    result.logical_metric_updates)
@@ -217,11 +240,13 @@ def handle_result(
             self._side_inputs_container.add_values(
                 view,
                 committed_bundle.get_elements_iterable(make_copy=True))
-          if (self.get_execution_context(result.transform)
-              .watermarks.input_watermark
-              == WatermarkManager.WATERMARK_POS_INF):
-            self._pending_unblocked_tasks.extend(
-                self._side_inputs_container.finalize_value_and_get_tasks(view))
+
+      # Tasks generated from unblocked side inputs as the watermark progresses.
+      tasks = self._watermark_manager.update_watermarks(
+          completed_bundle, result.transform, completed_timers,
+          committed_bundles, unprocessed_bundles, result.keyed_watermark_holds,
+          self._side_inputs_container)
+      self._pending_unblocked_tasks.extend(tasks)
 
       if result.counters:
         for counter in result.counters:
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py 
b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 084073f4fe7..74a021674f8 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -94,14 +94,14 @@ def get_watermarks(self, applied_ptransform):
 
   def update_watermarks(self, completed_committed_bundle, applied_ptransform,
                         completed_timers, outputs, unprocessed_bundles,
-                        keyed_earliest_holds):
+                        keyed_earliest_holds, side_inputs_container):
     assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
     self._update_pending(
         completed_committed_bundle, applied_ptransform, completed_timers,
         outputs, unprocessed_bundles)
     tw = self.get_watermarks(applied_ptransform)
     tw.hold(keyed_earliest_holds)
-    self._refresh_watermarks(applied_ptransform)
+    return self._refresh_watermarks(applied_ptransform, side_inputs_container)
 
   def _update_pending(self, input_committed_bundle, applied_ptransform,
                       completed_timers, output_committed_bundles,
@@ -128,8 +128,9 @@ def _update_pending(self, input_committed_bundle, 
applied_ptransform,
     if input_committed_bundle and input_committed_bundle.has_elements():
       completed_tw.remove_pending(input_committed_bundle)
 
-  def _refresh_watermarks(self, applied_ptransform):
+  def _refresh_watermarks(self, applied_ptransform, side_inputs_container):
     assert isinstance(applied_ptransform, pipeline.AppliedPTransform)
+    unblocked_tasks = []
     tw = self.get_watermarks(applied_ptransform)
     if tw.refresh():
       for pval in applied_ptransform.outputs.values():
@@ -141,7 +142,12 @@ def _refresh_watermarks(self, applied_ptransform):
           if v in self._value_to_consumers:  # If there are downstream 
consumers
             consumers = self._value_to_consumers[v]
             for consumer in consumers:
-              self._refresh_watermarks(consumer)
+              unblocked_tasks.extend(
+                  self._refresh_watermarks(consumer, side_inputs_container))
+      unblocked_tasks.extend(
+          side_inputs_container.update_watermarks_for_transform(
+              applied_ptransform, tw))
+    return unblocked_tasks
 
   def extract_all_timers(self):
     """Extracts fired timers for all transforms
diff --git a/sdks/python/apache_beam/testing/test_stream_test.py 
b/sdks/python/apache_beam/testing/test_stream_test.py
index a3f2413f167..3beeb4b2825 100644
--- a/sdks/python/apache_beam/testing/test_stream_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -30,9 +30,11 @@
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 from apache_beam.transforms import trigger
+from apache_beam.transforms import window
 from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import TimestampedValue
 from apache_beam.utils import timestamp
+from apache_beam.utils.timestamp import Timestamp
 from apache_beam.utils.windowed_value import WindowedValue
 
 
@@ -245,6 +247,84 @@ def fired_elements(elem):
     # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
     self.assertEqual([('k', ['a'])], result)
 
+  def test_basic_execution_sideinputs_batch(self):
+
+    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+    global result     # pylint: disable=global-variable-undefined
+    result = []
+
+    def recorded_elements(elem):
+      result.append(elem)
+      return elem
+
+    options = PipelineOptions()
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
+
+    main_stream = (p
+                   | 'main TestStream' >> TestStream()
+                   .advance_watermark_to(10)
+                   .add_elements(['e']))
+    side = (p
+            | beam.Create([2, 1, 4])
+            | beam.Map(lambda t: window.TimestampedValue(t, t)))
+
+    class RecordFn(beam.DoFn):
+      def process(self,
+                  elm=beam.DoFn.ElementParam,
+                  ts=beam.DoFn.TimestampParam,
+                  side=beam.DoFn.SideInputParam):
+        yield (elm, ts, side)
+
+    records = (main_stream     # pylint: disable=unused-variable
+               | beam.ParDo(RecordFn(), beam.pvalue.AsList(side))
+               | beam.Map(recorded_elements))
+    p.run()
+
+    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+    self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result)
+
+  def test_basic_execution_sideinputs(self):
+
+    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+    global result     # pylint: disable=global-variable-undefined
+    result = []
+
+    def recorded_elements(elem):
+      result.append(elem)
+      return elem
+
+    options = PipelineOptions()
+    options.view_as(StandardOptions).streaming = True
+    p = TestPipeline(options=options)
+
+    main_stream = (p
+                   | 'main TestStream' >> TestStream()
+                   .advance_watermark_to(10)
+                   .add_elements(['e'])
+                   .advance_processing_time(11))
+    side_stream = (p
+                   | 'side TestStream' >> TestStream()
+                   .add_elements([window.TimestampedValue(2, 2)])
+                   .add_elements([window.TimestampedValue(1, 1)])
+                   .add_elements([window.TimestampedValue(4, 4)]))
+
+    class RecordFn(beam.DoFn):
+      def process(self,
+                  elm=beam.DoFn.ElementParam,
+                  ts=beam.DoFn.TimestampParam,
+                  side=beam.DoFn.SideInputParam):
+        yield (elm, ts, side)
+
+    records = (main_stream        # pylint: disable=unused-variable
+               | beam.ParDo(RecordFn(), beam.pvalue.AsList(side_stream))
+               | beam.Map(recorded_elements))
+
+    p.run()
+
+    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
+    self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result)
+
 
 if __name__ == '__main__':
   unittest.main()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 81917)
    Time Spent: 4h 40m  (was: 4.5h)

> Add support for the streaming side inputs in the Python DirectRunner
> --------------------------------------------------------------------
>
>                 Key: BEAM-3818
>                 URL: https://issues.apache.org/jira/browse/BEAM-3818
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: María GH
>            Assignee: María GH
>            Priority: Minor
>             Fix For: 3.0.0
>
>          Time Spent: 4h 40m
>  Remaining Estimate: 0h
>
> The streaming DirectRunner should support streaming side input semantics.  
> Currently, side inputs are only available for globally-windowed side input 
> PCollections.
> Also, empty side inputs cause a pipeline stall.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to