On Tue, May 11, 2021 at 9:01 AM Kenneth Knowles <k...@apache.org> wrote:
> > > On Mon, May 10, 2021 at 7:40 PM Reuven Lax <re...@google.com> wrote: > >> Hi, >> >> I've been looking at the implementation of GroupIntoBatches (hoping to >> add support to group based on byte size), and I have a few questions about >> the current implementation. >> >> 1. I noticed that the transform does not preserve input timestamps. The >> output timestamp is not explicitly set, so it will be whatever the default >> output timestamp. Confusingly this will depend on which path is taken. If >> the batch is output while processing an element, then the timestamp of tha >> element will be used. If the batch is output from the timer, then the >> processing-time value of the timer will be used. >> > > Seems like bugs. > > >> - Should I start setting outputTimestamp explicitly - ideally to the >> minimum timestamp in the current batch of elements? >> > > That's a sensible default. Could pass a TimestampCombiner if that leads to > watermark delays. Given the nature of the transform, the simple approach > will probably be fine. > This is a bit tricky to do. in order to update the output timestamp, we need to call setTimer again. However since we are calling timer.offset().setRelative(), this will keep bumping the timer into the future and it will never fire. One solution would be for GIB to just store the current timer ts in state, and make sure that keep setting the same target until the timer fires. However that precludes us from using setRelative (since that method does not allow the DoFn to see what target time is being set). I think that this approach won't play nice with TestStream.advanceProcessingTimeTo. We could also add a way to update _just_ the output time for a timer without resetting the entire timer. > > - Should we have the option to preserve all element timestamps? this >> could look something like this: >> >> PColllection<KV<K, TimestampedValue<V>>> batches = >> >> input.apply(GroupIntoBatches.ofSize(N).withTimestampedValues()); >> > > This seems useful. > > >> 2. flushBatch always resets the timer, even after the batch is processed. >> The only reason I can think of for doing so is to update the watermark >> hold. TimerInternals contains a deleteTimer method - is there any reason we >> shouldn't simply implement Timer.clear and hook it up to >> the existing deleteTimer? >> > > Makes sense to me. Prior thread on this seemed to have lightweight > consensus: > https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E > > >> 3. This transform was implemented before OnWindowExpiration was >> implemented. I think we should add a window-expiration callback, and stop >> setting the end-of window timer. >> > > +1 > > Kenn >