This is an automated email from the ASF dual-hosted git repository.
udim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 706e20f [BEAM-9487] Disable allowing unsafe triggers by default
new cf8e08f Merge pull request #15340 from zhoufek/gbk_233
706e20f is described below
commit 706e20f7c3f0dafe834981e2dd2715082d38e548
Author: zhoufek <[email protected]>
AuthorDate: Tue Aug 17 09:03:56 2021 -0400
[BEAM-9487] Disable allowing unsafe triggers by default
---
CHANGES.md | 6 +++--
.../python/apache_beam/options/pipeline_options.py | 3 +--
sdks/python/apache_beam/transforms/core.py | 29 ++++++++++------------
3 files changed, 18 insertions(+), 20 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 6da170d..6fc25b8 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -63,7 +63,8 @@
## Breaking Changes
-* X behavior was changed
([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* Python GBK by defualt will fail on unbounded PCollections that have global
windowing and a default trigger. The `--allow_unsafe_triggers` flag can be used
to override this.
([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
+* Python GBK will fail if it detects an unsafe trigger unless the
`--allow_unsafe_triggers` flag is set.
([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
## Deprecations
@@ -123,7 +124,8 @@
## Deprecations
-* X behavior is deprecated and will be removed in X versions
([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* Python GBK will stop supporting unbounded PCollections that have global
windowing and a default trigger in Beam 2.33. This can be overriden with
`--allow_unsafe_triggers`.
([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
+* Python GBK will start requiring safe triggers or the
`--allow_unsafe_triggers` flag starting with Beam 2.33.
([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
## Known Issues
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index c597b4a..bba56ef 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -534,8 +534,7 @@ class TypeOptions(PipelineOptions):
'compatibility. See BEAM-11719.')
parser.add_argument(
'--allow_unsafe_triggers',
- # TODO(BEAM-9487): Set to False for Beam 2.33
- default=True,
+ default=False,
action='store_true',
help='Allow the use of unsafe triggers. Unsafe triggers have the '
'potential to cause data loss due to finishing and/or never having '
diff --git a/sdks/python/apache_beam/transforms/core.py
b/sdks/python/apache_beam/transforms/core.py
index 93d29ed..25b05df 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -2320,10 +2320,10 @@ class GroupByKey(PTransform):
if pcoll.pipeline.allow_unsafe_triggers:
# TODO(BEAM-9487) Change comment for Beam 2.33
_LOGGER.warning(
- 'PCollection passed to GroupByKey (label: %s) is unbounded, has a '
- 'global window, and uses a default trigger. This will no longer '
- 'be allowed starting with Beam 2.33 unless '
- '--allow_unsafe_triggers is set.',
+ '%s: PCollection passed to GroupByKey is unbounded, has a global '
+ 'window, and uses a default trigger. This is being allowed '
+ 'because --allow_unsafe_triggers is set, but it may prevent '
+ 'data from making it through the pipeline.',
self.label)
else:
raise ValueError(
@@ -2332,22 +2332,19 @@ class GroupByKey(PTransform):
unsafe_reason = trigger.may_lose_data(windowing)
if unsafe_reason != DataLossReason.NO_POTENTIAL_LOSS:
+ reason_msg = str(unsafe_reason).replace('DataLossReason.', '')
if pcoll.pipeline.allow_unsafe_triggers:
- # TODO(BEAM-9487): Switch back to this log for Beam 2.33.
- # _LOGGER.warning(
- # 'Skipping trigger safety check. '
- # 'This could lead to incomplete or missing groups.')
_LOGGER.warning(
- '%s: Unsafe trigger type (%s) detected. Starting with '
- 'Beam 2.33, this will raise an error by default. '
- 'Either change the pipeline to use a safe trigger or '
- 'set the --allow_unsafe_triggers flag.',
+ '%s: Unsafe trigger `%s` detected (reason: %s). This is '
+ 'being allowed because --allow_unsafe_triggers is set. This could '
+ 'lead to missing or incomplete groups.',
self.label,
- unsafe_reason)
+ trigger,
+ reason_msg)
else:
- msg = 'Unsafe trigger: `{}` may lose data. '.format(trigger)
- msg += 'Reason: {}. '.format(
- str(unsafe_reason).replace('DataLossReason.', ''))
+ msg = '{}: Unsafe trigger: `{}` may lose data. '.format(
+ self.label, trigger)
+ msg += 'Reason: {}. '.format(reason_msg)
msg += 'This can be overriden with the --allow_unsafe_triggers flag.'
raise ValueError(msg)