This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3477d4a76d4 Revert "Force discarding mode in with_fanout without 
rewindowing." (#24378)
3477d4a76d4 is described below

commit 3477d4a76d46c1ad3c89acb636ea7807bfc23e15
Author: tvalentyn <[email protected]>
AuthorDate: Mon Nov 28 13:16:33 2022 -0800

    Revert "Force discarding mode in with_fanout without rewindowing." (#24378)
---
 .../apache_beam/transforms/combiners_test.py       | 61 ----------------------
 sdks/python/apache_beam/transforms/core.py         | 14 ++---
 2 files changed, 4 insertions(+), 71 deletions(-)

diff --git a/sdks/python/apache_beam/transforms/combiners_test.py 
b/sdks/python/apache_beam/transforms/combiners_test.py
index 2ea0ba862b9..7e0e83542ee 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -563,67 +563,6 @@ class CombineTest(unittest.TestCase):
 
       assert_that(result, has_expected_values)
 
-  def test_combining_with_sliding_windows_and_fanout(self):
-    options = PipelineOptions()
-    options.view_as(StandardOptions).streaming = True
-    with TestPipeline(options=options) as p:
-
-      def has_expected_values(actual):
-        from hamcrest.core import assert_that as hamcrest_assert
-        from hamcrest.library.collection import only_contains
-        ordered = sorted(actual)
-
-        hamcrest_assert(
-            ordered,
-            only_contains([0, 1, 2, 3], [0, 1, 2, 3, 5, 6, 7, 8], [5, 6, 7, 
8]))
-
-      result = (
-          p
-          | beam.Create([
-              window.TimestampedValue(0, Timestamp(seconds=1666707510)),
-              window.TimestampedValue(1, Timestamp(seconds=1666707511)),
-              window.TimestampedValue(2, Timestamp(seconds=1666707512)),
-              window.TimestampedValue(3, Timestamp(seconds=1666707513)),
-              window.TimestampedValue(5, Timestamp(seconds=1666707515)),
-              window.TimestampedValue(6, Timestamp(seconds=1666707516)),
-              window.TimestampedValue(7, Timestamp(seconds=1666707517)),
-              window.TimestampedValue(8, Timestamp(seconds=1666707518))
-          ])
-          | beam.WindowInto(window.SlidingWindows(10, 5))
-          | beam.CombineGlobally(beam.combiners.ToListCombineFn()).
-          without_defaults().with_fanout(7))
-      assert_that(result, has_expected_values)
-
-  def test_combining_with_session_windows_and_fanout(self):
-    options = PipelineOptions()
-    options.view_as(StandardOptions).streaming = True
-    with TestPipeline(options=options) as p:
-
-      def has_expected_values(actual):
-        from hamcrest.core import assert_that as hamcrest_assert
-        from hamcrest.library.collection import only_contains
-        ordered = sorted(actual)
-
-        hamcrest_assert(ordered, only_contains([0, 1, 2, 3], [5, 6, 7, 8]))
-
-      result = (
-          p
-          | beam.Create([
-              window.TimestampedValue(0, Timestamp(seconds=1666707510)),
-              window.TimestampedValue(1, Timestamp(seconds=1666707511)),
-              window.TimestampedValue(2, Timestamp(seconds=1666707512)),
-              window.TimestampedValue(3, Timestamp(seconds=1666707513)),
-              window.TimestampedValue(5, Timestamp(seconds=1666707515)),
-              window.TimestampedValue(6, Timestamp(seconds=1666707516)),
-              window.TimestampedValue(7, Timestamp(seconds=1666707517)),
-              window.TimestampedValue(8, Timestamp(seconds=1666707518))
-          ])
-          | beam.WindowInto(window.Sessions(2))
-          | beam.CombineGlobally(beam.combiners.ToListCombineFn()).
-          without_defaults().with_fanout(7))
-
-      assert_that(result, has_expected_values)
-
   def test_MeanCombineFn_combine(self):
     with TestPipeline() as p:
       input = (
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 317844170ca..978e0e1eac3 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -2743,8 +2743,6 @@ class _CombinePerKeyWithHotKeyFanout(PTransform):
   def expand(self, pcoll):
 
     from apache_beam.transforms.trigger import AccumulationMode
-    from apache_beam.transforms.util import _IdentityWindowFn
-
     combine_fn = self._combine_fn
     fanout_fn = self._fanout_fn
 
@@ -2804,15 +2802,11 @@ class _CombinePerKeyWithHotKeyFanout(PTransform):
     precombined_hot = (
         hot
         # Avoid double counting that may happen with stacked accumulating mode.
-        | 'ForceDiscardingAccumulation' >> WindowInto(
-            _IdentityWindowFn(pcoll.windowing.windowfn.get_window_coder()),
-            trigger=pcoll.windowing.triggerfn,
-            accumulation_mode=AccumulationMode.DISCARDING,
-            timestamp_combiner=pcoll.windowing.timestamp_combiner,
-            allowed_lateness=pcoll.windowing.allowed_lateness)
+        | 'WindowIntoDiscarding' >> WindowInto(
+            pcoll.windowing, accumulation_mode=AccumulationMode.DISCARDING)
         | CombinePerKey(PreCombineFn())
-        | Map(StripNonce))
-
+        | Map(StripNonce)
+        | 'WindowIntoOriginal' >> WindowInto(pcoll.windowing))
     return ((cold, precombined_hot)
             | Flatten()
             | CombinePerKey(PostCombineFn()))

Reply via email to