I agree that variable support across Runners does limit the adoption of a 
feature.  But it's also then limited if the SDKs and their local / direct 
runners don't yet support the feature. The Go SDK doesn't currently have a way 
of specifying that annotation, preventing use.  (The lack of mention of the 
Python direct runner your list implies it's not yet supported by the Python 
SDK, and a quick search shows that's likely [0])

While not yet widely available to the other SDKs, Prism, the new Go SDK Local 
Runner, maintains data in event time sorted heaps [1]. The intent was to 
implement the annotation (among other features) once I start running the Java 
and Python Validates Runner suites against it.

I think stateful transforms are getting the event ordering on values for "free" 
as a result [2], but there's no special/behavior at present if the DoFn is 
consuming the result of a Group By Key.

Part of the issue is that by definition, a GBK "loses" the timestamps of the 
values, and doesn't emit them, outside of using them to determine the resulting 
timestamp of the Key... [3]. To make use of the timestamp in the aggregation 
stage a runner would need to do something different in the GBK, namely sorting 
by the timestamp as the data is ingested, and keeping that timestamp around to 
continue the sort. This prevents a more efficient implementation of directly 
arranging the received element bytes into the Iterator format, requiring a post 
process filtering. Not hard, but a little dissatisfying.

Skimming through the discussion, I agree with the general utility goal of the 
annotation, but as with many Beam features, there may be a discoverability 
problem. The feature isn't mentioned in the Programming Guide (AFAICT), and 
trying to find anything on the beam site, the top result is the Javadoc for the 
annotation (which is good, but you still need to know to look for it), and then 
the next time related bit is OrderedListState which doesn't yet have a 
meaningful portable representation last I checked [4], once again limiting 
adoption.

Probably the most critical bit is, while we have broad "handling" of the 
annotation, I'm hard pressed to say we even use the annotation outside of 
tests. A search [5] doesn't show any "Transforms" or "IOs" making use of it 
with the only markdown/documentation about it being the Beam 2.20.0 release 
notes saying it's now supported in Flink and Spark [6].

I will say, this isn't grounds for removing the feature, as I can only check 
what's in the repo, and not what end users have, but it does indicate we didn't 
drive the feature to completion and enable user adoption beyond "This Exists, 
and we can tell you about it if you ask.".

AFAICT this is just one of those features we built, but then proceeded not to 
use within Beam, and evangelize. This is a point we could certainly do better 
on in Beam as a whole.

Robert Burke
Beam Go Busybody

[0]  
https://github.com/search?q=repo%3Aapache%2Fbeam+TIME_SORTED_INPUT+language%3APython&type=code

[1] 
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L93

[2] 
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L1094

[3] 
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1132

[4] 
https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+OrderedListState

[5] 
https://github.com/search?q=repo%3Aapache%2Fbeam+RequiresTimeSortedInput&type=code&p=2

[6] 
https://github.com/apache/beam/blob/b4c23b32f2b80ce052c8a235e5064c69f37df992/website/www/site/content/en/blog/beam-2.20.0.md?plain=1#L46

On 2024/01/18 16:14:56 Jan Lukavský wrote:
> Hi,
> 
> recently I came across the fact that most runners do not support 
> @RequiresTimeSortedInput annotation for sorting per-key data by event 
> timestamp [1]. Actually, runners supporting it seem to be Direct java, 
> Flink and Dataflow batch (as it is a noop there). The annotation has 
> use-cases in time-series data processing, in transaction processing and 
> more. Though it is absolutely possible to implement the time-sorting 
> manually (e.g. [2]), this is actually efficient only in streaming mode, 
> in batch mode the runner typically wants to leverage the internal 
> sort-grouping it already does.
> 
> The original idea was to implement this annotation inside 
> StatefulDoFnRunner, which would be used by majority of runners. It turns 
> out that this is not the case. The question now is, should we use an 
> alternative place to implement the annotation (e.g. Pipeline expansion, 
> or DoFnInvoker) so that more runners can benefit from it automatically 
> (at least for streaming case, batch case needs to be implemented 
> manually)? Do the community find the annotation useful? I'm linking a 
> rather old (and long :)) thread that preceded introduction of the 
> annotation [3] for more context.
> 
> I sense the current adoption of the annotation by runners makes it 
> somewhat use-less.
> 
> Looking forward to any comments on this.
> 
> Best,
> 
>   Jan
> 
> [1] 
> https://beam.apache.org/releases/javadoc/2.53.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html
> 
> [2] 
> https://cloud.google.com/spanner/docs/change-streams/use-dataflow#order-by-key
> 
> [3] https://lists.apache.org/thread/bkl9kk8l44xw2sw08s7m54k1wsc3n4tn
> 
> 

Reply via email to