This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4fcb951 [BEAM-9487] Raise error in GroupByKey on invalid PCollections
new e91cb94 Merge pull request #14780 from [BEAM-9487] Raise error in
GroupByKey on invalid PCollections
4fcb951 is described below
commit 4fcb9513de17e177e573ab2bb99e3319ccfa1d54
Author: zhoufek <[email protected]>
AuthorDate: Tue May 11 11:39:29 2021 -0400
[BEAM-9487] Raise error in GroupByKey on invalid PCollections
---
.../apache_beam/runners/interactive/pipeline_instrument_test.py | 5 ++++-
sdks/python/apache_beam/testing/util.py | 2 ++
sdks/python/apache_beam/transforms/core.py | 9 +++++++++
sdks/python/apache_beam/transforms/ptransform_test.py | 9 +++++++++
sdks/python/apache_beam/transforms/util_test.py | 2 +-
5 files changed, 25 insertions(+), 2 deletions(-)
diff --git
a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
index 56d3734..ede13cf 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
@@ -40,6 +40,9 @@ from apache_beam.testing.test_stream import TestStream
class PipelineInstrumentTest(unittest.TestCase):
+ def setUp(self):
+ ie.new_env()
+
def cache_key_of(self, name, pcoll):
return repr(
instr.CacheKey(
@@ -186,7 +189,7 @@ class PipelineInstrumentTest(unittest.TestCase):
# Add some extra PTransform afterwards to make sure that only the unbounded
# sources remain.
- c = (a, b) | beam.CoGroupByKey()
+ c = (a, b) | beam.Flatten()
_ = c | beam.Map(lambda x: x)
ib.watch(locals())
diff --git a/sdks/python/apache_beam/testing/util.py
b/sdks/python/apache_beam/testing/util.py
index 326b239..5951581 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -282,11 +282,13 @@ def assert_that(
pcoll = pcoll | ParDo(ReifyTimestampWindow())
keyed_singleton = pcoll.pipeline | Create([(None, None)])
+ keyed_singleton.is_bounded = True
if use_global_window:
pcoll = pcoll | WindowInto(window.GlobalWindows())
keyed_actual = pcoll | "ToVoidKey" >> Map(lambda v: (None, v))
+ keyed_actual.is_bounded = True
# This is a CoGroupByKey so that the matcher always runs, even if the
# PCollection is empty.
diff --git a/sdks/python/apache_beam/transforms/core.py
b/sdks/python/apache_beam/transforms/core.py
index a89cefa..54fd139 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -2314,6 +2314,15 @@ class GroupByKey(PTransform):
key_type, typehints.WindowedValue[value_type]]] # type: ignore[misc]
def expand(self, pcoll):
+ from apache_beam.transforms.trigger import DefaultTrigger
+ windowing = pcoll.windowing
+ if not pcoll.is_bounded and isinstance(
+ windowing.windowfn, GlobalWindows) and isinstance(windowing.triggerfn,
+ DefaultTrigger):
+ raise ValueError(
+ 'GroupByKey cannot be applied to an unbounded ' +
+ 'PCollection with global windowing and a default trigger')
+
return pvalue.PCollection.from_(pcoll)
def infer_output_type(self, input_type):
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py
b/sdks/python/apache_beam/transforms/ptransform_test.py
index 8e5be54..61e8a0a 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -44,6 +44,7 @@ from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.portability import common_urns
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms import WindowInto
@@ -471,6 +472,14 @@ class PTransformTest(unittest.TestCase):
result = pcoll | 'Group' >> beam.GroupByKey() | _SortLists
assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
+ def test_group_by_key_unbounded_global_default_trigger(self):
+ with self.assertRaisesRegex(
+ ValueError,
+ 'GroupByKey cannot be applied to an unbounded PCollection with ' +
+ 'global windowing and a default trigger'):
+ with TestPipeline() as pipeline:
+ pipeline | TestStream() | beam.GroupByKey()
+
def test_group_by_key_reiteration(self):
class MyDoFn(beam.DoFn):
def process(self, gbk_result):
diff --git a/sdks/python/apache_beam/transforms/util_test.py
b/sdks/python/apache_beam/transforms/util_test.py
index df76d15..14878c4 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -713,8 +713,8 @@ class GroupIntoBatchesTest(unittest.TestCase):
max_buffering_duration_secs,
fake_clock)
| "count elements in batch" >> Map(lambda x: (None, len(x[1])))
- | "global window" >> WindowInto(GlobalWindows())
| GroupByKey()
+ | "global window" >> WindowInto(GlobalWindows())
| FlatMapTuple(lambda k, vs: vs))
# Window duration is 6 and batch size is 5, so output batch size