TheNeuralBit commented on issue #22723:
URL: https://github.com/apache/beam/issues/22723#issuecomment-1224493228

   Interesting. I'm not sure which of these is the oversight, perhaps @robertwb 
can comment as the original author. 
   
   Looking at the code: 
https://github.com/apache/beam/blob/c7f64264451af12ff6c7c0ef4bc95fd7ce0f5418/sdks/python/apache_beam/transforms/util.py#L539-L552
   
   In the process case we just yield the batch - likely this was just written 
ignoring the timestamp, but it turns out that Beam actually attaches the 
current element's timestamp in the OutputProcessor.
   In the finish_bundle case we wrap in a `GlobalWindows.windowed_value` - 
likely this was just to satisfy some type check (static or in output 
processing), but it turns out we end up using `min_timestamp()`.
   
   As @scwhittle noted, the per window version always uses the current window's 
max timestamp: 
https://github.com/apache/beam/blob/c7f64264451af12ff6c7c0ef4bc95fd7ce0f5418/sdks/python/apache_beam/transforms/util.py#L584-L585
   
https://github.com/apache/beam/blob/c7f64264451af12ff6c7c0ef4bc95fd7ce0f5418/sdks/python/apache_beam/transforms/util.py#L602-L603
   
   I took a look at what the `GroupIntoBatches` implementations do here, but 
AFAICT they're not opinionated either, and rely on the SDK worker to decide 
what timestamp to use: 
https://github.com/apache/beam/blob/c7f64264451af12ff6c7c0ef4bc95fd7ce0f5418/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java#L545
   
   Presumably this is typically the most recent element's timestamp, when it 
completes a batch. I'm not sure about the case when a timer fires though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to