On Mon, May 10, 2021 at 7:40 PM Reuven Lax <re...@google.com> wrote: > Hi, > > I've been looking at the implementation of GroupIntoBatches (hoping to add > support to group based on byte size), and I have a few questions about the > current implementation. > > 1. I noticed that the transform does not preserve input timestamps. The > output timestamp is not explicitly set, so it will be whatever the default > output timestamp. Confusingly this will depend on which path is taken. If > the batch is output while processing an element, then the timestamp of tha > element will be used. If the batch is output from the timer, then the > processing-time value of the timer will be used. >
Seems like bugs. > - Should I start setting outputTimestamp explicitly - ideally to the > minimum timestamp in the current batch of elements? > That's a sensible default. Could pass a TimestampCombiner if that leads to watermark delays. Given the nature of the transform, the simple approach will probably be fine. - Should we have the option to preserve all element timestamps? this > could look something like this: > > PColllection<KV<K, TimestampedValue<V>>> batches = > > input.apply(GroupIntoBatches.ofSize(N).withTimestampedValues()); > This seems useful. > 2. flushBatch always resets the timer, even after the batch is processed. > The only reason I can think of for doing so is to update the watermark > hold. TimerInternals contains a deleteTimer method - is there any reason we > shouldn't simply implement Timer.clear and hook it up to > the existing deleteTimer? > Makes sense to me. Prior thread on this seemed to have lightweight consensus: https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E > 3. This transform was implemented before OnWindowExpiration was > implemented. I think we should add a window-expiration callback, and stop > setting the end-of window timer. > +1 Kenn