[ 
https://issues.apache.org/jira/browse/BEAM-3377?focusedWorklogId=100937&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-100937
 ]

ASF GitHub Bot logged work on BEAM-3377:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/May/18 01:07
            Start Date: 11/May/18 01:07
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #5279: [BEAM-3377] Call 
assert_that per window via custom_windowing (flag)
URL: https://github.com/apache/beam/pull/5279
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/testing/test_stream_test.py 
b/sdks/python/apache_beam/testing/test_stream_test.py
index 826ef8e0a48..184a61e2a55 100644
--- a/sdks/python/apache_beam/testing/test_stream_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -29,6 +29,7 @@
 from apache_beam.testing.test_stream import WatermarkEvent
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
+from apache_beam.testing.util import equal_to_per_window
 from apache_beam.transforms import trigger
 from apache_beam.transforms import window
 from apache_beam.transforms.window import FixedWindows
@@ -110,6 +111,7 @@ def process(self, element=beam.DoFn.ElementParam,
     p = TestPipeline(options=options)
     my_record_fn = RecordFn()
     records = p | test_stream | beam.ParDo(my_record_fn)
+
     assert_that(records, equal_to([
         ('a', timestamp.Timestamp(10)),
         ('b', timestamp.Timestamp(10)),
@@ -118,6 +120,29 @@ def process(self, element=beam.DoFn.ElementParam,
         ('e', timestamp.Timestamp(20)),
         ('late', timestamp.Timestamp(12)),
         ('last', timestamp.Timestamp(310)),]))
+
+    # assert per window
+    expected_window_to_elements = {
+        window.IntervalWindow(0, 15): [
+            ('a', Timestamp(10)),
+            ('b', Timestamp(10)),
+            ('c', Timestamp(10)),
+            ('late', Timestamp(12))
+        ],
+        window.IntervalWindow(15, 30): [
+            ('d', Timestamp(20)),
+            ('e', Timestamp(20))
+        ],
+        window.IntervalWindow(300, 315): [
+            ('last', Timestamp(310)),
+        ],
+    }
+    assert_that(
+        records,
+        equal_to_per_window(expected_window_to_elements),
+        custom_windowing=window.FixedWindows(15),
+        label='assert per window')
+
     p.run()
 
   def test_gbk_execution_no_triggers(self):
@@ -132,14 +157,6 @@ def test_gbk_execution_no_triggers(self):
                    .add_elements([TimestampedValue('late', 12)])
                    .add_elements([TimestampedValue('last', 310)]))
 
-    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
-    global result     # pylint: disable=global-variable-undefined
-    result = []
-
-    def fired_elements(elem):
-      result.append(elem)
-      return elem
-
     options = PipelineOptions()
     options.view_as(StandardOptions).streaming = True
     p = TestPipeline(options=options)
@@ -147,8 +164,8 @@ def fired_elements(elem):
                | test_stream
                | beam.WindowInto(FixedWindows(15))
                | beam.Map(lambda x: ('k', x))
-               | beam.GroupByKey()
-               | beam.Map(fired_elements))
+               | beam.GroupByKey())
+
     # TODO(BEAM-2519): timestamp assignment for elements from a GBK should
     # respect the TimestampCombiner.  The test below should also verify the
     # timestamps of the outputted elements once this is implemented.
@@ -157,13 +174,27 @@ def fired_elements(elem):
         ('k', ['d', 'e']),
         ('k', ['late']),
         ('k', ['last'])]))
+
+    # assert per window
+    expected_window_to_elements = {
+        window.IntervalWindow(15, 30): [
+            ('k', ['a', 'b', 'c']),
+            ('k', ['late']),
+        ],
+        window.IntervalWindow(30, 45): [
+            ('k', ['d', 'e']),
+        ],
+        window.IntervalWindow(300, 315): [
+            ('k', ['last']),
+        ],
+    }
+    assert_that(
+        records,
+        equal_to_per_window(expected_window_to_elements),
+        custom_windowing=window.FixedWindows(15),
+        label='assert per window')
+
     p.run()
-    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
-    self.assertEqual([
-        ('k', ['a', 'b', 'c']),
-        ('k', ['d', 'e']),
-        ('k', ['late']),
-        ('k', ['last'])], result)
 
   def test_gbk_execution_after_watermark_trigger(self):
     test_stream = (TestStream()
@@ -171,14 +202,6 @@ def test_gbk_execution_after_watermark_trigger(self):
                    .add_elements(['a'])
                    .advance_watermark_to(20))
 
-    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
-    global result   # pylint: disable=global-variable-undefined
-    result = []
-
-    def fired_elements(elem):
-      result.append(elem)
-      return elem
-
     options = PipelineOptions()
     options.view_as(StandardOptions).streaming = True
     p = TestPipeline(options=options)
@@ -189,19 +212,26 @@ def fired_elements(elem):
                    trigger=trigger.AfterWatermark(early=trigger.AfterCount(1)),
                    accumulation_mode=trigger.AccumulationMode.DISCARDING)
                | beam.Map(lambda x: ('k', x))
-               | beam.GroupByKey()
-               | beam.Map(fired_elements))
+               | beam.GroupByKey())
+
     # TODO(BEAM-2519): timestamp assignment for elements from a GBK should
     # respect the TimestampCombiner.  The test below should also verify the
     # timestamps of the outputted elements once this is implemented.
 
-    # TODO(BEAM-3377): Reinstate after assert_that in streaming is fixed.
-    # assert_that(records, equal_to([
-    #     ('k', ['a']), ('k', [])]))
+    # assert per window
+    expected_window_to_elements = {
+        window.IntervalWindow(15, 30): [
+            ('k', ['a']),
+            ('k', []),
+        ],
+    }
+    assert_that(
+        records,
+        equal_to_per_window(expected_window_to_elements),
+        custom_windowing=window.FixedWindows(15),
+        label='assert per window')
 
     p.run()
-    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
-    self.assertEqual([('k', ['a']), ('k', [])], result)
 
   def test_gbk_execution_after_processing_trigger_fired(self):
     """Advance TestClock to (X + delta) and see the pipeline does finish."""
@@ -214,14 +244,6 @@ def 
test_gbk_execution_after_processing_trigger_fired(self):
                    .add_elements(['a'])
                    .advance_processing_time(5.1))
 
-    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
-    global result     # pylint: disable=global-variable-undefined
-    result = []
-
-    def fired_elements(elem):
-      result.append(elem)
-      return elem
-
     options = PipelineOptions()
     options.view_as(StandardOptions).streaming = True
     p = TestPipeline(options=options)
@@ -233,30 +255,28 @@ def fired_elements(elem):
                    accumulation_mode=trigger.AccumulationMode.DISCARDING
                    )
                | beam.Map(lambda x: ('k', x))
-               | beam.GroupByKey()
-               | beam.Map(fired_elements))
+               | beam.GroupByKey())
+
     # TODO(BEAM-2519): timestamp assignment for elements from a GBK should
     # respect the TimestampCombiner.  The test below should also verify the
     # timestamps of the outputted elements once this is implemented.
 
-    # TODO(BEAM-3377): Reinstate after assert_that in streaming is fixed.
     assert_that(records, equal_to([
         ('k', ['a'])]))
 
+    # assert per window
+    expected_window_to_elements = {
+        window.IntervalWindow(15, 30): [('k', ['a'])],
+    }
+    assert_that(
+        records,
+        equal_to_per_window(expected_window_to_elements),
+        custom_windowing=window.FixedWindows(15),
+        label='assert per window')
+
     p.run()
-    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
-    self.assertEqual([('k', ['a'])], result)
 
   def test_basic_execution_batch_sideinputs(self):
-
-    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
-    global result     # pylint: disable=global-variable-undefined
-    result = []
-
-    def recorded_elements(elem):
-      result.append(elem)
-      return elem
-
     options = PipelineOptions()
     options.view_as(StandardOptions).streaming = True
     p = TestPipeline(options=options)
@@ -277,23 +297,24 @@ def process(self,
         yield (elm, ts, side)
 
     records = (main_stream     # pylint: disable=unused-variable
-               | beam.ParDo(RecordFn(), beam.pvalue.AsList(side))
-               | beam.Map(recorded_elements))
+               | beam.ParDo(RecordFn(), beam.pvalue.AsList(side)))
+
+    # assert per window
+    expected_window_to_elements = {
+        window.IntervalWindow(0, 15): [
+            ('e', Timestamp(10), [2, 1, 4]),
+        ],
+    }
+    assert_that(
+        records,
+        equal_to_per_window(expected_window_to_elements),
+        custom_windowing=window.FixedWindows(15),
+        label='assert per window')
+
+    assert_that(records, equal_to([('e', Timestamp(10), [2, 1, 4])]))
     p.run()
 
-    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
-    self.assertEqual([('e', Timestamp(10), [2, 1, 4])], result)
-
   def test_basic_execution_sideinputs(self):
-
-    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
-    global result     # pylint: disable=global-variable-undefined
-    result = []
-
-    def recorded_elements(elem):
-      result.append(elem)
-      return elem
-
     options = PipelineOptions()
     options.view_as(StandardOptions).streaming = True
     p = TestPipeline(options=options)
@@ -318,24 +339,24 @@ def process(self,
         yield (elm, ts, side)
 
     records = (main_stream        # pylint: disable=unused-variable
-               | beam.ParDo(RecordFn(), beam.pvalue.AsList(side_stream))
-               | beam.Map(recorded_elements))
-
+               | beam.ParDo(RecordFn(), beam.pvalue.AsList(side_stream)))
+
+    # assert per window
+    expected_window_to_elements = {
+        window.IntervalWindow(0, 15): [
+            ('e', Timestamp(10), [2, 1, 7, 4]),
+        ],
+    }
+    assert_that(
+        records,
+        equal_to_per_window(expected_window_to_elements),
+        custom_windowing=window.FixedWindows(15),
+        label='assert per window')
+
+    assert_that(records, equal_to([('e', Timestamp(10), [2, 1, 7, 4])]))
     p.run()
 
-    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
-    self.assertEqual([('e', Timestamp(10), [2, 1, 7, 4])], result)
-
   def test_basic_execution_batch_sideinputs_fixed_windows(self):
-
-    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
-    global result     # pylint: disable=global-variable-undefined
-    result = []
-
-    def recorded_elements(elem):
-      result.append(elem)
-      return elem
-
     options = PipelineOptions()
     options.view_as(StandardOptions).streaming = True
     p = TestPipeline(options=options)
@@ -360,24 +381,22 @@ def process(self,
         yield (elm, ts, side)
 
     records = (main_stream     # pylint: disable=unused-variable
-               | beam.ParDo(RecordFn(), beam.pvalue.AsList(side))
-               | beam.Map(recorded_elements))
-    p.run()
+               | beam.ParDo(RecordFn(), beam.pvalue.AsList(side)))
+
+    # assert per window
+    expected_window_to_elements = {
+        window.IntervalWindow(2, 3):[('a', Timestamp(2), [2])],
+        window.IntervalWindow(4, 5):[('b', Timestamp(4), [4])]
+    }
+    assert_that(
+        records,
+        equal_to_per_window(expected_window_to_elements),
+        custom_windowing=window.FixedWindows(1),
+        label='assert per window')
 
-    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
-    self.assertEqual([('a', Timestamp(2), [2]),
-                      ('b', Timestamp(4), [4])], result)
+    p.run()
 
   def test_basic_execution_sideinputs_fixed_windows(self):
-
-    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
-    global result     # pylint: disable=global-variable-undefined
-    result = []
-
-    def recorded_elements(elem):
-      result.append(elem)
-      return elem
-
     options = PipelineOptions()
     options.view_as(StandardOptions).streaming = True
     p = TestPipeline(options=options)
@@ -410,17 +429,26 @@ def process(self,
         yield (elm, ts, side)
 
     records = (main_stream     # pylint: disable=unused-variable
-               | beam.ParDo(RecordFn(), beam.pvalue.AsList(side_stream))
-               | beam.Map(recorded_elements))
-    p.run()
+               | beam.ParDo(RecordFn(), beam.pvalue.AsList(side_stream)))
+
+    # assert per window
+    expected_window_to_elements = {
+        window.IntervalWindow(9, 10): [
+            ('a1', Timestamp(9), ['s1']),
+            ('a2', Timestamp(9), ['s1']),
+            ('a3', Timestamp(9), ['s1']),
+            ('a4', Timestamp(9), ['s1']),
+            ('b', Timestamp(9), ['s1'])
+        ],
+        window.IntervalWindow(18, 19):[('c', Timestamp(18), ['s2'])],
+    }
+    assert_that(
+        records,
+        equal_to_per_window(expected_window_to_elements),
+        custom_windowing=window.FixedWindows(1),
+        label='assert per window')
 
-    # TODO(BEAM-3377): Remove after assert_that in streaming is fixed.
-    self.assertEqual([('a1', Timestamp(9), ['s1']),
-                      ('a2', Timestamp(9), ['s1']),
-                      ('a3', Timestamp(9), ['s1']),
-                      ('a4', Timestamp(9), ['s1']),
-                      ('b', Timestamp(9), ['s1']),
-                      ('c', Timestamp(18), ['s2'])], result)
+    p.run()
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/testing/util.py 
b/sdks/python/apache_beam/testing/util.py
index e442425505c..5caa78ce919 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -75,6 +75,31 @@ def __repr__(self):
   return InAnyOrder(iterable)
 
 
+def equal_to_per_window(expected_window_to_elements):
+  """Matcher used by assert_that to check on values for specific windows.
+
+  Arguments:
+    expected_window_to_elements: A dictionary where the keys are the windows
+      to check and the values are the elements associated with each window.
+  """
+  def matcher(elements):
+    actual_elements_in_window, window = elements
+    if window in expected_window_to_elements:
+      expected_elements_in_window = list(
+          expected_window_to_elements[window])
+      sorted_expected = sorted(expected_elements_in_window)
+      sorted_actual = sorted(actual_elements_in_window)
+      if sorted_expected != sorted_actual:
+        # Results for the same window don't necessarily come all
+        # at once. Hence the same actual window may contain only
+        # subsets of the expected elements for the window.
+        # For example, in the presence of early triggers.
+        if all(elem in sorted_expected for elem in sorted_actual) is False:
+          raise BeamAssertException(
+              'Failed assert: %r == %r' % (sorted_expected, sorted_actual))
+  return matcher
+
+
 # Note that equal_to always sorts the expected and actual since what we
 # compare are PCollections for which there is no guaranteed order.
 # However the sorting does not go beyond top level therefore [1,2] and [2,1]
@@ -100,7 +125,8 @@ def _empty(actual):
   return _empty
 
 
-def assert_that(actual, matcher, label='assert_that', reify_windows=False):
+def assert_that(actual, matcher, custom_windowing=None,
+                label='assert_that', reify_windows=False):
   """A PTransform that checks a PCollection has an expected value.
 
   Note that assert_that should be used only for testing pipelines since the
@@ -113,6 +139,8 @@ def assert_that(actual, matcher, label='assert_that', 
reify_windows=False):
       expectations and raises BeamAssertException if they are not met.
     label: Optional string label. This is needed in case several assert_that
       transforms are introduced in the same pipeline.
+    custom_windowing: If specified, matcher is passed a dictionary of
+      (k, v) = (window, elements in the window).
     reify_windows: If True, matcher is passed a list of TestWindowedValue.
 
   Returns:
@@ -128,23 +156,29 @@ def process(self, element, timestamp=DoFn.TimestampParam,
       # the timestamp and window out of the latter.
       return [TestWindowedValue(element, timestamp, [window])]
 
+  class AddWindow(DoFn):
+    def process(self, element, window=DoFn.WindowParam):
+      yield element, window
+
   class AssertThat(PTransform):
 
     def expand(self, pcoll):
       if reify_windows:
         pcoll = pcoll | ParDo(ReifyTimestampWindow())
 
-      # We must have at least a single element to ensure the matcher
-      # code gets run even if the input pcollection is empty.
       keyed_singleton = pcoll.pipeline | Create([(None, None)])
       keyed_actual = (
           pcoll
-          | WindowInto(window.GlobalWindows())
+          | WindowInto(custom_windowing or window.GlobalWindows())
           | "ToVoidKey" >> Map(lambda v: (None, v)))
-      _ = ((keyed_singleton, keyed_actual)
-           | "Group" >> CoGroupByKey()
-           | "Unkey" >> Map(lambda k___actual_values: k___actual_values[1][1])
-           | "Match" >> Map(matcher))
+      plain_actual = ((keyed_singleton, keyed_actual)
+                      | "Group" >> CoGroupByKey()
+                      | "Unkey" >> Map(lambda k_values: k_values[1][1]))
+
+      if custom_windowing:
+        plain_actual = plain_actual | "AddWindow" >> ParDo(AddWindow())
+
+      plain_actual = plain_actual | "Match" >> Map(matcher)
 
     def default_label(self):
       return label
diff --git a/sdks/python/apache_beam/testing/util_test.py 
b/sdks/python/apache_beam/testing/util_test.py
index e4e86941669..8e362ac6936 100644
--- a/sdks/python/apache_beam/testing/util_test.py
+++ b/sdks/python/apache_beam/testing/util_test.py
@@ -44,18 +44,18 @@ def test_assert_that_passes_empty_is_empty(self):
     with TestPipeline() as p:
       assert_that(p | Create([]), is_empty())
 
-  def test_windowed_value_passes(self):
-    expected = [TestWindowedValue(v, MIN_TIMESTAMP, [GlobalWindow()])
-                for v in [1, 2, 3]]
-    with TestPipeline() as p:
-      assert_that(p | Create([2, 3, 1]), equal_to(expected), 
reify_windows=True)
-
   def test_assert_that_fails(self):
     with self.assertRaises(Exception):
       with TestPipeline() as p:
         assert_that(p | Create([1, 10, 100]), equal_to([1, 2, 3]))
 
-  def test_windowed_value_assert_fail_unmatched_value(self):
+  def test_reified_value_passes(self):
+    expected = [TestWindowedValue(v, MIN_TIMESTAMP, [GlobalWindow()])
+                for v in [1, 2, 3]]
+    with TestPipeline() as p:
+      assert_that(p | Create([2, 3, 1]), equal_to(expected), 
reify_windows=True)
+
+  def test_reified_value_assert_fail_unmatched_value(self):
     expected = [TestWindowedValue(v + 1, MIN_TIMESTAMP, [GlobalWindow()])
                 for v in [1, 2, 3]]
     with self.assertRaises(Exception):
@@ -63,7 +63,7 @@ def test_windowed_value_assert_fail_unmatched_value(self):
         assert_that(p | Create([2, 3, 1]), equal_to(expected),
                     reify_windows=True)
 
-  def test_windowed_value_assert_fail_unmatched_timestamp(self):
+  def test_reified_value_assert_fail_unmatched_timestamp(self):
     expected = [TestWindowedValue(v, 1, [GlobalWindow()])
                 for v in [1, 2, 3]]
     with self.assertRaises(Exception):
@@ -71,7 +71,7 @@ def test_windowed_value_assert_fail_unmatched_timestamp(self):
         assert_that(p | Create([2, 3, 1]), equal_to(expected),
                     reify_windows=True)
 
-  def test_windowed_value_assert_fail_unmatched_window(self):
+  def test_reified_value_assert_fail_unmatched_window(self):
     expected = [TestWindowedValue(v, MIN_TIMESTAMP, [IntervalWindow(0, 1)])
                 for v in [1, 2, 3]]
     with self.assertRaises(Exception):


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 100937)
    Time Spent: 3h 50m  (was: 3h 40m)

> assert_that not working for streaming
> -------------------------------------
>
>                 Key: BEAM-3377
>                 URL: https://issues.apache.org/jira/browse/BEAM-3377
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.2.0
>            Reporter: MarĂ­a GH
>            Priority: Major
>              Labels: starter
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> assert_that does not work for AfterWatermark timers.
> Easy way to reproduce: modify test_gbk_execution [1] in this form:
>  
> {code:java}
>  def test_this(self):
>     test_stream = (TestStream()
>                    .add_elements(['a', 'b', 'c'])
>                    .advance_watermark_to(20))
>     def fnc(x):
>       print 'fired_elem:', x
>       return x
>     options = PipelineOptions()
>     options.view_as(StandardOptions).streaming = True
>     p = TestPipeline(options=options)
>     records = (p
>                | test_stream
>                | beam.WindowInto(
>                    FixedWindows(15),
>                    
> trigger=trigger.AfterWatermark(early=trigger.AfterCount(2)),
>                    accumulation_mode=trigger.AccumulationMode.ACCUMULATING)
>                | beam.Map(lambda x: ('k', x))
>                | beam.GroupByKey())
>     assert_that(records, equal_to([
>         ('k', ['a', 'b', 'c'])]))
>     p.run()
> {code}
> This test will pass, but if the .advance_watermark_to(20) is removed, the 
> test will fail. However, both cases fire the same elements:
>       fired_elem: ('k', ['a', 'b', 'c'])
>       fired_elem: ('k', ['a', 'b', 'c'])
> In the passing case, they correspond to the sorted_actual inside the 
> assert_that. In the failing case:
>       sorted_actual: [('k', ['a', 'b', 'c']), ('k', ['a', 'b', 'c'])]
>       sorted_actual: []
> [1] 
> https://github.com/mariapython/incubator-beam/blob/direct-timers-show/sdks/python/apache_beam/testing/test_stream_test.py#L120



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to