Hi,
recently I came across the fact that most runners do not support
@RequiresTimeSortedInput annotation for sorting per-key data by event
timestamp [1]. Actually, runners supporting it seem to be Direct java,
Flink and Dataflow batch (as it is a noop there). The annotation has
use-cases in time-series data processing, in transaction processing and
more. Though it is absolutely possible to implement the time-sorting
manually (e.g. [2]), this is actually efficient only in streaming mode,
in batch mode the runner typically wants to leverage the internal
sort-grouping it already does.
The original idea was to implement this annotation inside
StatefulDoFnRunner, which would be used by majority of runners. It turns
out that this is not the case. The question now is, should we use an
alternative place to implement the annotation (e.g. Pipeline expansion,
or DoFnInvoker) so that more runners can benefit from it automatically
(at least for streaming case, batch case needs to be implemented
manually)? Do the community find the annotation useful? I'm linking a
rather old (and long :)) thread that preceded introduction of the
annotation [3] for more context.
I sense the current adoption of the annotation by runners makes it
somewhat use-less.
Looking forward to any comments on this.
Best,
Jan
[1]
https://beam.apache.org/releases/javadoc/2.53.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html
[2]
https://cloud.google.com/spanner/docs/change-streams/use-dataflow#order-by-key
[3] https://lists.apache.org/thread/bkl9kk8l44xw2sw08s7m54k1wsc3n4tn