This is an automated email from the ASF dual-hosted git repository.
udim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c5a6988 [BEAM-9487] Disable GBK safety checks by default
new f2bbae8 Merge pull request #15003 from zhoufek/aut
c5a6988 is described below
commit c5a6988835046f635aa54ce95414ab97ceb1940b
Author: zhoufek <[email protected]>
AuthorDate: Fri Jun 11 15:29:02 2021 -0400
[BEAM-9487] Disable GBK safety checks by default
---
CHANGES.md | 3 ++
.../python/apache_beam/options/pipeline_options.py | 3 +-
sdks/python/apache_beam/transforms/core.py | 39 ++++++++++++++++------
.../apache_beam/transforms/ptransform_test.py | 8 +++--
sdks/python/apache_beam/transforms/trigger.py | 6 ++--
5 files changed, 42 insertions(+), 17 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index b231074..98a5fd9 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -92,6 +92,7 @@
* `CREATE FUNCTION` DDL statement added to Calcite SQL syntax. `JAR` and
`AGGREGATE` are now reserved keywords.
([BEAM-12339](https://issues.apache.org/jira/browse/BEAM-12339)).
* Flink 1.13 is now supported by the Flink runner
([BEAM-12277](https://issues.apache.org/jira/browse/BEAM-12277)).
+* Python `TriggerFn` has a new `may_lose_data` method to signal potential data
loss. Default behavior assumes safe (necessary for backwards compatibility).
See Deprecations for potential impact of overriding this.
([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
* X feature added (Java/Python)
([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
## Breaking Changes
@@ -107,6 +108,8 @@
## Deprecations
* X behavior is deprecated and will be removed in X versions
([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* Python GBK will stop supporting unbounded PCollections that have global
windowing and a default trigger in Beam 2.33. This can be overriden with
`--allow_unsafe_triggers`.
([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
+* Python GBK will start requiring safe triggers or the
`--allow_unsafe_triggers` flag starting with Beam 2.33.
([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
## Known Issues
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index 9cf9eb6..5073a86 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -534,7 +534,8 @@ class TypeOptions(PipelineOptions):
'compatibility. See BEAM-11719.')
parser.add_argument(
'--allow_unsafe_triggers',
- default=False,
+ # TODO(BEAM-9487): Set to False for Beam 2.33
+ default=True,
action='store_true',
help='Allow the use of unsafe triggers. Unsafe triggers have the '
'potential to cause data loss due to finishing and/or never having '
diff --git a/sdks/python/apache_beam/transforms/core.py
b/sdks/python/apache_beam/transforms/core.py
index acc3c70..897046d 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -2321,22 +2321,39 @@ class GroupByKey(PTransform):
if not pcoll.is_bounded and isinstance(
windowing.windowfn, GlobalWindows) and isinstance(trigger,
DefaultTrigger):
- raise ValueError(
- 'GroupByKey cannot be applied to an unbounded ' +
- 'PCollection with global windowing and a default trigger')
-
- if not pcoll.pipeline.allow_unsafe_triggers:
- unsafe_reason = trigger.may_lose_data(windowing)
- if unsafe_reason != DataLossReason.NO_POTENTIAL_LOSS:
+ if pcoll.pipeline.allow_unsafe_triggers:
+ # TODO(BEAM-9487) Change comment for Beam 2.33
+ _LOGGER.warning(
+ 'PCollection passed to GroupByKey (label: %s) is unbounded, has a '
+ 'global window, and uses a default trigger. This will no longer '
+ 'be allowed starting with Beam 2.33 unless '
+ '--allow_unsafe_triggers is set.',
+ self.label)
+ else:
+ raise ValueError(
+ 'GroupByKey cannot be applied to an unbounded ' +
+ 'PCollection with global windowing and a default trigger')
+
+ unsafe_reason = trigger.may_lose_data(windowing)
+ if unsafe_reason != DataLossReason.NO_POTENTIAL_LOSS:
+ if pcoll.pipeline.allow_unsafe_triggers:
+ # TODO(BEAM-9487): Switch back to this log for Beam 2.33.
+ # _LOGGER.warning(
+ # 'Skipping trigger safety check. '
+ # 'This could lead to incomplete or missing groups.')
+ _LOGGER.warning(
+ '%s: Unsafe trigger type (%s) detected. Starting with '
+ 'Beam 2.33, this will raise an error by default. '
+ 'Either change the pipeline to use a safe trigger or '
+ 'set the --allow_unsafe_triggers flag.',
+ self.label,
+ unsafe_reason)
+ else:
msg = 'Unsafe trigger: `{}` may lose data. '.format(trigger)
msg += 'Reason: {}. '.format(
str(unsafe_reason).replace('DataLossReason.', ''))
msg += 'This can be overriden with the --allow_unsafe_triggers flag.'
raise ValueError(msg)
- else:
- _LOGGER.warning(
- 'Skipping trigger safety check. '
- 'This could lead to incomplete or missing groups.')
return pvalue.PCollection.from_(pcoll)
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py
b/sdks/python/apache_beam/transforms/ptransform_test.py
index 64c6847..93ab1d4 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -477,16 +477,20 @@ class PTransformTest(unittest.TestCase):
assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
def test_group_by_key_unbounded_global_default_trigger(self):
+ test_options = PipelineOptions()
+ test_options.view_as(TypeOptions).allow_unsafe_triggers = False
with self.assertRaisesRegex(
ValueError,
'GroupByKey cannot be applied to an unbounded PCollection with ' +
'global windowing and a default trigger'):
- with TestPipeline() as pipeline:
+ with TestPipeline(options=test_options) as pipeline:
pipeline | TestStream() | beam.GroupByKey()
def test_group_by_key_unsafe_trigger(self):
+ test_options = PipelineOptions()
+ test_options.view_as(TypeOptions).allow_unsafe_triggers = False
with self.assertRaisesRegex(ValueError, 'Unsafe trigger'):
- with TestPipeline() as pipeline:
+ with TestPipeline(options=test_options) as pipeline:
_ = (
pipeline
| beam.Create([(None, None)])
diff --git a/sdks/python/apache_beam/transforms/trigger.py
b/sdks/python/apache_beam/transforms/trigger.py
index 6569d3f..226252c 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -248,8 +248,7 @@ class TriggerFn(metaclass=ABCMeta):
"""Clear any state and timers used by this TriggerFn."""
pass
- @abstractmethod
- def may_lose_data(self, windowing):
+ def may_lose_data(self, unused_windowing):
# type: (core.Windowing) -> DataLossReason
"""Returns whether or not this trigger could cause data loss.
@@ -283,7 +282,8 @@ class TriggerFn(metaclass=ABCMeta):
data loss can result from finishing or not having the condition met,
the result will be DataLossReason.MAY_FINISH|CONDITION_NOT_GUARANTEED.
"""
- pass
+ # For backwards compatibility's sake, we're assuming the trigger is safe.
+ return DataLossReason.NO_POTENTIAL_LOSS
# pylint: enable=unused-argument