kennknowles commented on a change in pull request #15603:
URL: https://github.com/apache/beam/pull/15603#discussion_r726571848
##########
File path: sdks/python/apache_beam/transforms/trigger.py
##########
@@ -850,7 +855,12 @@ class AfterAll(_ParallelTriggerFn):
combine_op = all
def may_lose_data(self, windowing):
- return reduce(or_, (t.may_lose_data(windowing) for t in self.triggers))
+ """If all sub-triggers may finish, then this may finish."""
Review comment:
Noted below: I believe this will finish once all subtriggers have
_fired_ so you can always return unsafe.
https://github.com/apache/beam/blob/d2b785a98cd644e2a683c8895375a3fdc2c96ece/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java#L33
https://github.com/apache/beam/blob/d2b785a98cd644e2a683c8895375a3fdc2c96ece/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java#L245-L247
##########
File path: sdks/python/apache_beam/transforms/trigger_test.py
##########
@@ -470,93 +470,58 @@ def
test_after_watermark_no_allowed_lateness_safe_late(self):
0,
DataLossReason.NO_POTENTIAL_LOSS)
- def test_after_watermark_safe_late(self):
+ def test_after_watermark_allowed_lateness_safe_late(self):
self._test(
AfterWatermark(late=DefaultTrigger()),
60,
DataLossReason.NO_POTENTIAL_LOSS)
- def test_after_watermark_no_allowed_lateness_may_finish_late(self):
- self._test(
- AfterWatermark(late=AfterProcessingTime()),
- 0,
- DataLossReason.NO_POTENTIAL_LOSS)
-
- def test_after_watermark_may_finish_late(self):
- self._test(
- AfterWatermark(late=AfterProcessingTime()),
- 60,
- DataLossReason.NO_POTENTIAL_LOSS)
-
- def test_after_watermark_no_allowed_lateness_condition_late(self):
- self._test(
- AfterWatermark(late=AfterCount(5)), 0,
DataLossReason.NO_POTENTIAL_LOSS)
-
- def test_after_watermark_condition_late(self):
- self._test(
- AfterWatermark(late=AfterCount(5)),
- 60,
- DataLossReason.NO_POTENTIAL_LOSS)
-
- def test_after_count_one(self):
- self._test(AfterCount(1), 0, DataLossReason.MAY_FINISH)
-
- def test_after_count_gt_one(self):
- self._test(
- AfterCount(2),
- 0,
- DataLossReason.MAY_FINISH | DataLossReason.CONDITION_NOT_GUARANTEED)
+ def test_after_count(self):
+ self._test(AfterCount(42), 0, DataLossReason.MAY_FINISH)
def test_repeatedly_safe_underlying(self):
self._test(
Repeatedly(DefaultTrigger()), 0, DataLossReason.NO_POTENTIAL_LOSS)
- def test_repeatedly_may_finish_underlying(self):
- self._test(Repeatedly(AfterCount(1)), 0, DataLossReason.NO_POTENTIAL_LOSS)
-
- def test_repeatedly_condition_underlying(self):
- self._test(Repeatedly(AfterCount(2)), 0, DataLossReason.NO_POTENTIAL_LOSS)
+ def test_repeatedly_unsafe_underlying(self):
+ self._test(Repeatedly(AfterCount(42)), 0, DataLossReason.NO_POTENTIAL_LOSS)
- def test_after_any_some_unsafe(self):
+ def test_after_any_one_may_finish(self):
self._test(
- AfterAny(AfterCount(1), DefaultTrigger()),
- 0,
- DataLossReason.NO_POTENTIAL_LOSS)
-
- def test_after_any_same_reason(self):
- self._test(
- AfterAny(AfterCount(1), AfterProcessingTime()),
+ AfterAny(AfterCount(42), DefaultTrigger()),
0,
DataLossReason.MAY_FINISH)
- def test_after_any_different_reasons(self):
+ def test_after_any_all_safe(self):
self._test(
- AfterAny(AfterCount(2), AfterProcessingTime()),
+ AfterAny(Repeatedly(AfterCount(42)), DefaultTrigger()),
0,
- DataLossReason.MAY_FINISH | DataLossReason.CONDITION_NOT_GUARANTEED)
-
- def test_after_all_some_unsafe(self):
- self._test(
- AfterAll(AfterCount(1), DefaultTrigger()), 0,
DataLossReason.MAY_FINISH)
+ DataLossReason.NO_POTENTIAL_LOSS)
- def test_after_all_safe(self):
+ def test_after_all_some_may_finish(self):
self._test(
- AfterAll(Repeatedly(AfterCount(1)), DefaultTrigger()),
+ AfterAll(AfterCount(1), DefaultTrigger()),
Review comment:
AfterAll I believe will finish after DefaultTrigger fires once.
##########
File path: sdks/python/apache_beam/transforms/trigger.py
##########
@@ -833,13 +839,12 @@ class AfterAny(_ParallelTriggerFn):
combine_op = any
def may_lose_data(self, windowing):
- reason = DataLossReason.NO_POTENTIAL_LOSS
- for trigger in self.triggers:
- t_reason = trigger.may_lose_data(windowing)
- if t_reason == DataLossReason.NO_POTENTIAL_LOSS:
- return t_reason
- reason |= t_reason
- return reason
+ """If any sub-trigger may finish, this one may finish."""
Review comment:
This one also is always data loss risk.
https://github.com/apache/beam/blob/d2b785a98cd644e2a683c8895375a3fdc2c96ece/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java#L34
https://github.com/apache/beam/blob/d2b785a98cd644e2a683c8895375a3fdc2c96ece/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java#L245-L247
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]