Hi,

recently I came across the fact that most runners do not support @RequiresTimeSortedInput annotation for sorting per-key data by event timestamp [1]. Actually, runners supporting it seem to be Direct java, Flink and Dataflow batch (as it is a noop there). The annotation has use-cases in time-series data processing, in transaction processing and more. Though it is absolutely possible to implement the time-sorting manually (e.g. [2]), this is actually efficient only in streaming mode, in batch mode the runner typically wants to leverage the internal sort-grouping it already does.

The original idea was to implement this annotation inside StatefulDoFnRunner, which would be used by majority of runners. It turns out that this is not the case. The question now is, should we use an alternative place to implement the annotation (e.g. Pipeline expansion, or DoFnInvoker) so that more runners can benefit from it automatically (at least for streaming case, batch case needs to be implemented manually)? Do the community find the annotation useful? I'm linking a rather old (and long :)) thread that preceded introduction of the annotation [3] for more context.

I sense the current adoption of the annotation by runners makes it somewhat use-less.

Looking forward to any comments on this.

Best,

 Jan

[1] https://beam.apache.org/releases/javadoc/2.53.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html

[2] https://cloud.google.com/spanner/docs/change-streams/use-dataflow#order-by-key

[3] https://lists.apache.org/thread/bkl9kk8l44xw2sw08s7m54k1wsc3n4tn

Reply via email to