This is an automated email from the ASF dual-hosted git repository.

udim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c5a6988  [BEAM-9487] Disable GBK safety checks by default
     new f2bbae8  Merge pull request #15003 from zhoufek/aut
c5a6988 is described below

commit c5a6988835046f635aa54ce95414ab97ceb1940b
Author: zhoufek <[email protected]>
AuthorDate: Fri Jun 11 15:29:02 2021 -0400

    [BEAM-9487] Disable GBK safety checks by default
---
 CHANGES.md                                         |  3 ++
 .../python/apache_beam/options/pipeline_options.py |  3 +-
 sdks/python/apache_beam/transforms/core.py         | 39 ++++++++++++++++------
 .../apache_beam/transforms/ptransform_test.py      |  8 +++--
 sdks/python/apache_beam/transforms/trigger.py      |  6 ++--
 5 files changed, 42 insertions(+), 17 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index b231074..98a5fd9 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -92,6 +92,7 @@
 
 * `CREATE FUNCTION` DDL statement added to Calcite SQL syntax. `JAR` and 
`AGGREGATE` are now reserved keywords. 
([BEAM-12339](https://issues.apache.org/jira/browse/BEAM-12339)).
 * Flink 1.13 is now supported by the Flink runner 
([BEAM-12277](https://issues.apache.org/jira/browse/BEAM-12277)).
+* Python `TriggerFn` has a new `may_lose_data` method to signal potential data 
loss. Default behavior assumes safe (necessary for backwards compatibility). 
See Deprecations for potential impact of overriding this. 
([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
 * X feature added (Java/Python) 
([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 
 ## Breaking Changes
@@ -107,6 +108,8 @@
 ## Deprecations
 
 * X behavior is deprecated and will be removed in X versions 
([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* Python GBK will stop supporting unbounded PCollections that have global 
windowing and a default trigger in Beam 2.33. This can be overriden with 
`--allow_unsafe_triggers`. 
([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
+* Python GBK will start requiring safe triggers or the 
`--allow_unsafe_triggers` flag starting with Beam 2.33. 
([BEAM-9487](https://issues.apache.org/jira/browse/BEAM-9487)).
 
 ## Known Issues
 
diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index 9cf9eb6..5073a86 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -534,7 +534,8 @@ class TypeOptions(PipelineOptions):
         'compatibility. See BEAM-11719.')
     parser.add_argument(
         '--allow_unsafe_triggers',
-        default=False,
+        # TODO(BEAM-9487): Set to False for Beam 2.33
+        default=True,
         action='store_true',
         help='Allow the use of unsafe triggers. Unsafe triggers have the '
         'potential to cause data loss due to finishing and/or never having '
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index acc3c70..897046d 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -2321,22 +2321,39 @@ class GroupByKey(PTransform):
     if not pcoll.is_bounded and isinstance(
         windowing.windowfn, GlobalWindows) and isinstance(trigger,
                                                           DefaultTrigger):
-      raise ValueError(
-          'GroupByKey cannot be applied to an unbounded ' +
-          'PCollection with global windowing and a default trigger')
-
-    if not pcoll.pipeline.allow_unsafe_triggers:
-      unsafe_reason = trigger.may_lose_data(windowing)
-      if unsafe_reason != DataLossReason.NO_POTENTIAL_LOSS:
+      if pcoll.pipeline.allow_unsafe_triggers:
+        # TODO(BEAM-9487) Change comment for Beam 2.33
+        _LOGGER.warning(
+            'PCollection passed to GroupByKey (label: %s) is unbounded, has a '
+            'global window, and uses a default trigger. This will no longer '
+            'be allowed starting with Beam 2.33 unless '
+            '--allow_unsafe_triggers is set.',
+            self.label)
+      else:
+        raise ValueError(
+            'GroupByKey cannot be applied to an unbounded ' +
+            'PCollection with global windowing and a default trigger')
+
+    unsafe_reason = trigger.may_lose_data(windowing)
+    if unsafe_reason != DataLossReason.NO_POTENTIAL_LOSS:
+      if pcoll.pipeline.allow_unsafe_triggers:
+        # TODO(BEAM-9487): Switch back to this log for Beam 2.33.
+        # _LOGGER.warning(
+        #   'Skipping trigger safety check. '
+        #   'This could lead to incomplete or missing groups.')
+        _LOGGER.warning(
+            '%s: Unsafe trigger type (%s) detected. Starting with '
+            'Beam 2.33, this will raise an error by default. '
+            'Either change the pipeline to use a safe trigger or '
+            'set the --allow_unsafe_triggers flag.',
+            self.label,
+            unsafe_reason)
+      else:
         msg = 'Unsafe trigger: `{}` may lose data. '.format(trigger)
         msg += 'Reason: {}. '.format(
             str(unsafe_reason).replace('DataLossReason.', ''))
         msg += 'This can be overriden with the --allow_unsafe_triggers flag.'
         raise ValueError(msg)
-    else:
-      _LOGGER.warning(
-          'Skipping trigger safety check. '
-          'This could lead to incomplete or missing groups.')
 
     return pvalue.PCollection.from_(pcoll)
 
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py 
b/sdks/python/apache_beam/transforms/ptransform_test.py
index 64c6847..93ab1d4 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -477,16 +477,20 @@ class PTransformTest(unittest.TestCase):
       assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
 
   def test_group_by_key_unbounded_global_default_trigger(self):
+    test_options = PipelineOptions()
+    test_options.view_as(TypeOptions).allow_unsafe_triggers = False
     with self.assertRaisesRegex(
         ValueError,
         'GroupByKey cannot be applied to an unbounded PCollection with ' +
         'global windowing and a default trigger'):
-      with TestPipeline() as pipeline:
+      with TestPipeline(options=test_options) as pipeline:
         pipeline | TestStream() | beam.GroupByKey()
 
   def test_group_by_key_unsafe_trigger(self):
+    test_options = PipelineOptions()
+    test_options.view_as(TypeOptions).allow_unsafe_triggers = False
     with self.assertRaisesRegex(ValueError, 'Unsafe trigger'):
-      with TestPipeline() as pipeline:
+      with TestPipeline(options=test_options) as pipeline:
         _ = (
             pipeline
             | beam.Create([(None, None)])
diff --git a/sdks/python/apache_beam/transforms/trigger.py 
b/sdks/python/apache_beam/transforms/trigger.py
index 6569d3f..226252c 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -248,8 +248,7 @@ class TriggerFn(metaclass=ABCMeta):
     """Clear any state and timers used by this TriggerFn."""
     pass
 
-  @abstractmethod
-  def may_lose_data(self, windowing):
+  def may_lose_data(self, unused_windowing):
     # type: (core.Windowing) -> DataLossReason
 
     """Returns whether or not this trigger could cause data loss.
@@ -283,7 +282,8 @@ class TriggerFn(metaclass=ABCMeta):
         data loss can result from finishing or not having the condition met,
         the result will be DataLossReason.MAY_FINISH|CONDITION_NOT_GUARANTEED.
     """
-    pass
+    # For backwards compatibility's sake, we're assuming the trigger is safe.
+    return DataLossReason.NO_POTENTIAL_LOSS
 
 
 # pylint: enable=unused-argument

Reply via email to