This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fcb951  [BEAM-9487] Raise error in GroupByKey on invalid PCollections
     new e91cb94  Merge pull request #14780 from [BEAM-9487] Raise error in 
GroupByKey on invalid PCollections
4fcb951 is described below

commit 4fcb9513de17e177e573ab2bb99e3319ccfa1d54
Author: zhoufek <[email protected]>
AuthorDate: Tue May 11 11:39:29 2021 -0400

    [BEAM-9487] Raise error in GroupByKey on invalid PCollections
---
 .../apache_beam/runners/interactive/pipeline_instrument_test.py  | 5 ++++-
 sdks/python/apache_beam/testing/util.py                          | 2 ++
 sdks/python/apache_beam/transforms/core.py                       | 9 +++++++++
 sdks/python/apache_beam/transforms/ptransform_test.py            | 9 +++++++++
 sdks/python/apache_beam/transforms/util_test.py                  | 2 +-
 5 files changed, 25 insertions(+), 2 deletions(-)

diff --git 
a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py 
b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
index 56d3734..ede13cf 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
@@ -40,6 +40,9 @@ from apache_beam.testing.test_stream import TestStream
 
 
 class PipelineInstrumentTest(unittest.TestCase):
+  def setUp(self):
+    ie.new_env()
+
   def cache_key_of(self, name, pcoll):
     return repr(
         instr.CacheKey(
@@ -186,7 +189,7 @@ class PipelineInstrumentTest(unittest.TestCase):
 
     # Add some extra PTransform afterwards to make sure that only the unbounded
     # sources remain.
-    c = (a, b) | beam.CoGroupByKey()
+    c = (a, b) | beam.Flatten()
     _ = c | beam.Map(lambda x: x)
 
     ib.watch(locals())
diff --git a/sdks/python/apache_beam/testing/util.py 
b/sdks/python/apache_beam/testing/util.py
index 326b239..5951581 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -282,11 +282,13 @@ def assert_that(
         pcoll = pcoll | ParDo(ReifyTimestampWindow())
 
       keyed_singleton = pcoll.pipeline | Create([(None, None)])
+      keyed_singleton.is_bounded = True
 
       if use_global_window:
         pcoll = pcoll | WindowInto(window.GlobalWindows())
 
       keyed_actual = pcoll | "ToVoidKey" >> Map(lambda v: (None, v))
+      keyed_actual.is_bounded = True
 
       # This is a CoGroupByKey so that the matcher always runs, even if the
       # PCollection is empty.
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index a89cefa..54fd139 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -2314,6 +2314,15 @@ class GroupByKey(PTransform):
           key_type, typehints.WindowedValue[value_type]]]  # type: ignore[misc]
 
   def expand(self, pcoll):
+    from apache_beam.transforms.trigger import DefaultTrigger
+    windowing = pcoll.windowing
+    if not pcoll.is_bounded and isinstance(
+        windowing.windowfn, GlobalWindows) and isinstance(windowing.triggerfn,
+                                                          DefaultTrigger):
+      raise ValueError(
+          'GroupByKey cannot be applied to an unbounded ' +
+          'PCollection with global windowing and a default trigger')
+
     return pvalue.PCollection.from_(pcoll)
 
   def infer_output_type(self, input_type):
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py 
b/sdks/python/apache_beam/transforms/ptransform_test.py
index 8e5be54..61e8a0a 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -44,6 +44,7 @@ from apache_beam.metrics.metric import MetricsFilter
 from apache_beam.options.pipeline_options import TypeOptions
 from apache_beam.portability import common_urns
 from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_stream import TestStream
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 from apache_beam.transforms import WindowInto
@@ -471,6 +472,14 @@ class PTransformTest(unittest.TestCase):
       result = pcoll | 'Group' >> beam.GroupByKey() | _SortLists
       assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
 
+  def test_group_by_key_unbounded_global_default_trigger(self):
+    with self.assertRaisesRegex(
+        ValueError,
+        'GroupByKey cannot be applied to an unbounded PCollection with ' +
+        'global windowing and a default trigger'):
+      with TestPipeline() as pipeline:
+        pipeline | TestStream() | beam.GroupByKey()
+
   def test_group_by_key_reiteration(self):
     class MyDoFn(beam.DoFn):
       def process(self, gbk_result):
diff --git a/sdks/python/apache_beam/transforms/util_test.py 
b/sdks/python/apache_beam/transforms/util_test.py
index df76d15..14878c4 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -713,8 +713,8 @@ class GroupIntoBatchesTest(unittest.TestCase):
               max_buffering_duration_secs,
               fake_clock)
           | "count elements in batch" >> Map(lambda x: (None, len(x[1])))
-          | "global window" >> WindowInto(GlobalWindows())
           | GroupByKey()
+          | "global window" >> WindowInto(GlobalWindows())
           | FlatMapTuple(lambda k, vs: vs))
 
       # Window duration is 6 and batch size is 5, so output batch size

Reply via email to