robertwb commented on code in PR #23828:
URL: https://github.com/apache/beam/pull/23828#discussion_r1021833690


##########
sdks/python/apache_beam/transforms/combiners_test.py:
##########
@@ -563,6 +563,70 @@ def has_expected_values(actual):
 
       assert_that(result, has_expected_values)
 
+  def test_combining_with_sliding_windows_and_fanout(self):
+    options = PipelineOptions()
+    options.view_as(StandardOptions).streaming = True
+    with TestPipeline(options=options) as p:
+
+      def has_expected_values(actual):
+        from hamcrest.core import assert_that as hamcrest_assert
+        from hamcrest.library.collection import contains_exactly
+        ordered = sorted(actual)
+
+        hamcrest_assert(
+            ordered,
+            contains_exactly([0, 1, 2, 3], [0, 1, 2, 3, 5, 6, 7, 8],
+                             [5, 6, 7, 8]))
+
+      result = (
+          p
+          | beam.Create([
+              window.TimestampedValue(0, Timestamp(seconds=1666707510)),
+              window.TimestampedValue(1, Timestamp(seconds=1666707511)),
+              window.TimestampedValue(2, Timestamp(seconds=1666707512)),
+              window.TimestampedValue(3, Timestamp(seconds=1666707513)),
+              window.TimestampedValue(5, Timestamp(seconds=1666707515)),
+              window.TimestampedValue(6, Timestamp(seconds=1666707516)),
+              window.TimestampedValue(7, Timestamp(seconds=1666707517)),
+              window.TimestampedValue(8, Timestamp(seconds=1666707518))
+          ])
+          | beam.WindowInto(window.SlidingWindows(10, 5))
+          | beam.CombineGlobally(beam.combiners.ToListCombineFn()).
+          without_defaults().with_fanout(2))
+      assert_that(result, has_expected_values)
+
+  def test_combining_with_session_windows_and_fanout(self):
+    options = PipelineOptions()
+    options.view_as(StandardOptions).streaming = True
+    with TestPipeline(options=options) as p:
+
+      def has_expected_values(actual):
+        from hamcrest.core import assert_that as hamcrest_assert
+        from hamcrest.library.collection import contains_exactly
+        ordered = sorted(actual)
+
+        hamcrest_assert(ordered, contains_exactly([0, 1, 2, 3], [5, 6, 7, 8]))
+        # Different runners have different number of 15s, but there should
+        # be at least one 15.
+
+      result = (
+          p
+          | beam.Create([
+              window.TimestampedValue(0, Timestamp(seconds=1666707510)),
+              window.TimestampedValue(1, Timestamp(seconds=1666707511)),
+              window.TimestampedValue(2, Timestamp(seconds=1666707512)),
+              window.TimestampedValue(3, Timestamp(seconds=1666707513)),
+              window.TimestampedValue(5, Timestamp(seconds=1666707515)),
+              window.TimestampedValue(6, Timestamp(seconds=1666707516)),
+              window.TimestampedValue(7, Timestamp(seconds=1666707517)),
+              window.TimestampedValue(8, Timestamp(seconds=1666707518))
+          ])
+          | beam.WindowInto(window.Sessions(2))
+          | beam.CombineGlobally(beam.combiners.ToListCombineFn()).
+          without_defaults().with_fanout(2))

Review Comment:
   Does this work if we have with_fanout(100) as well?



##########
sdks/python/apache_beam/transforms/combiners_test.py:
##########
@@ -563,6 +563,70 @@ def has_expected_values(actual):
 
       assert_that(result, has_expected_values)
 
+  def test_combining_with_sliding_windows_and_fanout(self):
+    options = PipelineOptions()
+    options.view_as(StandardOptions).streaming = True
+    with TestPipeline(options=options) as p:
+
+      def has_expected_values(actual):
+        from hamcrest.core import assert_that as hamcrest_assert
+        from hamcrest.library.collection import contains_exactly
+        ordered = sorted(actual)
+
+        hamcrest_assert(
+            ordered,
+            contains_exactly([0, 1, 2, 3], [0, 1, 2, 3, 5, 6, 7, 8],
+                             [5, 6, 7, 8]))
+
+      result = (
+          p
+          | beam.Create([
+              window.TimestampedValue(0, Timestamp(seconds=1666707510)),
+              window.TimestampedValue(1, Timestamp(seconds=1666707511)),
+              window.TimestampedValue(2, Timestamp(seconds=1666707512)),
+              window.TimestampedValue(3, Timestamp(seconds=1666707513)),
+              window.TimestampedValue(5, Timestamp(seconds=1666707515)),
+              window.TimestampedValue(6, Timestamp(seconds=1666707516)),
+              window.TimestampedValue(7, Timestamp(seconds=1666707517)),
+              window.TimestampedValue(8, Timestamp(seconds=1666707518))
+          ])
+          | beam.WindowInto(window.SlidingWindows(10, 5))
+          | beam.CombineGlobally(beam.combiners.ToListCombineFn()).
+          without_defaults().with_fanout(2))
+      assert_that(result, has_expected_values)
+
+  def test_combining_with_session_windows_and_fanout(self):
+    options = PipelineOptions()
+    options.view_as(StandardOptions).streaming = True
+    with TestPipeline(options=options) as p:
+
+      def has_expected_values(actual):
+        from hamcrest.core import assert_that as hamcrest_assert
+        from hamcrest.library.collection import contains_exactly
+        ordered = sorted(actual)
+
+        hamcrest_assert(ordered, contains_exactly([0, 1, 2, 3], [5, 6, 7, 8]))
+        # Different runners have different number of 15s, but there should

Review Comment:
   I'm not following this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to