On Mon, May 10, 2021 at 7:40 PM Reuven Lax <re...@google.com> wrote:

> Hi,
>
> I've been looking at the implementation of GroupIntoBatches (hoping to add
> support to group based on byte size), and I have a few questions about the
> current implementation.
>
> 1. I noticed that the transform does not preserve input timestamps. The
> output timestamp is not explicitly set, so it will be whatever the default
> output timestamp. Confusingly this will depend on which path is taken. If
> the batch is output while processing an element, then the timestamp of tha
> element will be used. If the batch is output from the timer, then the
> processing-time value of the timer will be used.
>

Seems like bugs.


>      - Should I start setting outputTimestamp explicitly - ideally to the
> minimum timestamp in the current batch of elements?
>

That's a sensible default. Could pass a TimestampCombiner if that leads to
watermark delays. Given the nature of the transform, the simple approach
will probably be fine.

     - Should we have the option to preserve all element timestamps? this
> could look something like this:
>
>     PColllection<KV<K, TimestampedValue<V>>> batches =
>
> input.apply(GroupIntoBatches.ofSize(N).withTimestampedValues());
>

This seems useful.


> 2. flushBatch always resets the timer, even after the batch is processed.
> The only reason I can think of for doing so is to update the watermark
> hold. TimerInternals contains a deleteTimer method - is there any reason we
> shouldn't simply implement Timer.clear and hook it up to
> the existing deleteTimer?
>

Makes sense to me. Prior thread on this seemed to have lightweight
consensus:
https://lists.apache.org/thread.html/r91af7dff0070b80b275082ca0cff7486dc5dfdfc113f35f560600792%40%3Cdev.beam.apache.org%3E


> 3. This transform was implemented before OnWindowExpiration was
> implemented. I think we should add a window-expiration callback, and stop
> setting the end-of window timer.
>

+1

Kenn

Reply via email to