zhoufek commented on a change in pull request #15603:
URL: https://github.com/apache/beam/pull/15603#discussion_r718524059
##########
File path: sdks/python/apache_beam/transforms/trigger.py
##########
@@ -161,12 +159,42 @@ def with_prefix(self, prefix):
class DataLossReason(Flag):
- """Enum defining potential reasons that a trigger may cause data loss."""
+ """Enum defining potential reasons that a trigger may cause data loss.
+
+ These flags should only cover when the trigger is the cause, though windowing
+ can be taken into account. For instance, AfterWatermark may not flag itself
+ as finishing if the windowing doesn't allow lateness.
+ """
+
+ # Trigger will never be the source of data loss.
NO_POTENTIAL_LOSS = 0
+
+ # Trigger may finish. In this case, data that comes in after the trigger may
+ # be lost. Example: AfterCount(1) will stop firing after the first element.
MAY_FINISH = auto()
+
+ # Trigger has a condition that is not guaranteed to ever be met. In this
+ # case, data that comes in may be lost. Example: AfterCount(42) will lose
+ # 20 records if only 20 come in, since the condition to fire was never met.
CONDITION_NOT_GUARANTEED = auto()
Review comment:
Something that still isn't clear to me:
In Beam documentation, we make a note:
```
It is important to note that if, for example, you specify
.elementCountAtLeast(50)
and only 32 elements arrive, those 32 elements sit around forever. If the 32
elements
are important to you, consider using composite triggers to combine multiple
conditions.
This allows you to specify multiple firing conditions such as “fire either
when I receive
50 elements, or every 1 second”.
```
([link](https://beam.apache.org/documentation/programming-guide/#data-driven-triggers))
Based on discussion, though, it sounded like those 32 records would become
available at GC time. Is this warning in the documentation only for those who
want the data before GC?
Or, putting it in the context of GBK where this is actually used, an
[existing
test](https://github.com/apache/beam/blob/0111cff88025f0dc783a0890078b769139c8ae36/sdks/python/apache_beam/transforms/ptransform_test.py#L500)
shows that `AfterCount(5)` won't emit the data. If I change to
`Repeatedly(AfterCount(5))` it has the same behavior, so practically, it seems
`Repeatedly(AfterCount(5))` is still unsafe. Is this something to do with the
runner or windowing that wouldn't affect a real pipeline?
##########
File path: sdks/python/apache_beam/transforms/trigger.py
##########
@@ -161,12 +159,42 @@ def with_prefix(self, prefix):
class DataLossReason(Flag):
- """Enum defining potential reasons that a trigger may cause data loss."""
+ """Enum defining potential reasons that a trigger may cause data loss.
+
+ These flags should only cover when the trigger is the cause, though windowing
+ can be taken into account. For instance, AfterWatermark may not flag itself
+ as finishing if the windowing doesn't allow lateness.
+ """
+
+ # Trigger will never be the source of data loss.
NO_POTENTIAL_LOSS = 0
+
+ # Trigger may finish. In this case, data that comes in after the trigger may
+ # be lost. Example: AfterCount(1) will stop firing after the first element.
MAY_FINISH = auto()
+
+ # Trigger has a condition that is not guaranteed to ever be met. In this
+ # case, data that comes in may be lost. Example: AfterCount(42) will lose
+ # 20 records if only 20 come in, since the condition to fire was never met.
CONDITION_NOT_GUARANTEED = auto()
Review comment:
Something that still isn't clear to me:
In Beam documentation, we make a note:
```
It is important to note that if, for example, you specify
.elementCountAtLeast(50)
and only 32 elements arrive, those 32 elements sit around forever. If the 32
elements
are important to you, consider using composite triggers to combine multiple
conditions.
This allows you to specify multiple firing conditions such as “fire either
when I receive
50 elements, or every 1 second”.
```
([link](https://beam.apache.org/documentation/programming-guide/#data-driven-triggers))
Based on discussion, though, it sounded like those 32 records would become
available at GC time. Is this warning in the documentation only for those who
want the data before GC?
Or, putting it in the context of GBK where this is actually used, an
[existing
test](https://github.com/apache/beam/blob/0111cff88025f0dc783a0890078b769139c8ae36/sdks/python/apache_beam/transforms/ptransform_test.py#L500)
shows that `AfterCount(5)` won't emit the data. If I change to
`Repeatedly(AfterCount(5))` it has the same behavior, so practically, it seems
`Repeatedly(AfterCount(5))` is still unsafe. Is this something to do with the
runner that won't affect a real pipeline? Does global windowing make this
unsafe?
##########
File path: sdks/python/apache_beam/transforms/trigger.py
##########
@@ -161,12 +159,42 @@ def with_prefix(self, prefix):
class DataLossReason(Flag):
- """Enum defining potential reasons that a trigger may cause data loss."""
+ """Enum defining potential reasons that a trigger may cause data loss.
+
+ These flags should only cover when the trigger is the cause, though windowing
+ can be taken into account. For instance, AfterWatermark may not flag itself
+ as finishing if the windowing doesn't allow lateness.
+ """
+
+ # Trigger will never be the source of data loss.
NO_POTENTIAL_LOSS = 0
+
+ # Trigger may finish. In this case, data that comes in after the trigger may
+ # be lost. Example: AfterCount(1) will stop firing after the first element.
MAY_FINISH = auto()
+
+ # Trigger has a condition that is not guaranteed to ever be met. In this
+ # case, data that comes in may be lost. Example: AfterCount(42) will lose
+ # 20 records if only 20 come in, since the condition to fire was never met.
CONDITION_NOT_GUARANTEED = auto()
Review comment:
Something that still isn't clear to me:
In Beam documentation, we make a note:
```
It is important to note that if, for example, you specify
.elementCountAtLeast(50)
and only 32 elements arrive, those 32 elements sit around forever. If the 32
elements
are important to you, consider using composite triggers to combine multiple
conditions.
This allows you to specify multiple firing conditions such as “fire either
when I receive
50 elements, or every 1 second”.
```
([link](https://beam.apache.org/documentation/programming-guide/#data-driven-triggers))
Based on discussion, though, it sounded like those 32 records would become
available at GC time. Is this warning in the documentation only for those who
want the data before GC?
Or, putting it in the context of GBK where this is actually used, an
[existing
test](https://github.com/apache/beam/blob/0111cff88025f0dc783a0890078b769139c8ae36/sdks/python/apache_beam/transforms/ptransform_test.py#L500)
shows that `AfterCount(5)` won't emit the data. If I change to
`Repeatedly(AfterCount(5))` it has the same behavior, so practically, it seems
`Repeatedly(AfterCount(5))` is still unsafe. Should we account for whether or
not the windowing is global when making decisions based on GC?
##########
File path: sdks/python/apache_beam/transforms/trigger.py
##########
@@ -161,12 +159,42 @@ def with_prefix(self, prefix):
class DataLossReason(Flag):
- """Enum defining potential reasons that a trigger may cause data loss."""
+ """Enum defining potential reasons that a trigger may cause data loss.
+
+ These flags should only cover when the trigger is the cause, though windowing
+ can be taken into account. For instance, AfterWatermark may not flag itself
+ as finishing if the windowing doesn't allow lateness.
+ """
+
+ # Trigger will never be the source of data loss.
NO_POTENTIAL_LOSS = 0
+
+ # Trigger may finish. In this case, data that comes in after the trigger may
+ # be lost. Example: AfterCount(1) will stop firing after the first element.
MAY_FINISH = auto()
+
+ # Trigger has a condition that is not guaranteed to ever be met. In this
+ # case, data that comes in may be lost. Example: AfterCount(42) will lose
+ # 20 records if only 20 come in, since the condition to fire was never met.
CONDITION_NOT_GUARANTEED = auto()
Review comment:
Something that still isn't clear to me:
In Beam documentation, we make a note:
```
It is important to note that if, for example, you specify
.elementCountAtLeast(50)
and only 32 elements arrive, those 32 elements sit around forever. If the 32
elements
are important to you, consider using composite triggers to combine multiple
conditions.
This allows you to specify multiple firing conditions such as “fire either
when I receive
50 elements, or every 1 second”.
```
([link](https://beam.apache.org/documentation/programming-guide/#data-driven-triggers))
Based on discussion, though, it sounded like those 32 records would become
available at GC time. Is this warning in the documentation only for those who
want the data before GC?
Or, putting it in the context of GBK where this is actually used, an
[existing
test](https://github.com/apache/beam/blob/0111cff88025f0dc783a0890078b769139c8ae36/sdks/python/apache_beam/transforms/ptransform_test.py#L500)
shows that `AfterCount(5)` won't emit the data. If I change to
`Repeatedly(AfterCount(5))` it has the same behavior, so practically, it seems
`Repeatedly(AfterCount(5))` is still unsafe. Should we account for whether or
not the windowing is global when making decisions based on GC?
I'm asking, because I'm wondering if the bug wasn't so much with
`Repeatedly` as it was with `AfterCount` not accounting for windowing.
##########
File path: sdks/python/apache_beam/transforms/trigger.py
##########
@@ -161,12 +159,42 @@ def with_prefix(self, prefix):
class DataLossReason(Flag):
- """Enum defining potential reasons that a trigger may cause data loss."""
+ """Enum defining potential reasons that a trigger may cause data loss.
+
+ These flags should only cover when the trigger is the cause, though windowing
+ can be taken into account. For instance, AfterWatermark may not flag itself
+ as finishing if the windowing doesn't allow lateness.
+ """
+
+ # Trigger will never be the source of data loss.
NO_POTENTIAL_LOSS = 0
+
+ # Trigger may finish. In this case, data that comes in after the trigger may
+ # be lost. Example: AfterCount(1) will stop firing after the first element.
MAY_FINISH = auto()
+
+ # Trigger has a condition that is not guaranteed to ever be met. In this
+ # case, data that comes in may be lost. Example: AfterCount(42) will lose
+ # 20 records if only 20 come in, since the condition to fire was never met.
CONDITION_NOT_GUARANTEED = auto()
Review comment:
Ok, so what _should_ be happening in the test is that the four elements
in the global window should be emitted at GC time, and GBK should process them
before the pipeline finishes, but a bug in the direct runner stops those four
elements from emitting. Is that right?
If so, I'm guessing I should remove this as a potential data loss reason. It
was in there because of the comment in the documentation and because the test
seemed to confirm it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]