I was primarily focused on Java SDK (and core-contruction-java), but
generally speaking, any SDK can provide default expansion that runners
can use so that it is not (should not be) required to implement this
manually.
Currently, in Java SDK, the annotation is wired up into
StatefulDoFnRunner, which (as name suggests) can be used for running
stateful DoFns. The problem is that not every runner is using this
facility. Java SDK generally supports providing default expansions of
transforms, but _only for transforms that do not have to work with
dynamic state_. This is not the case for this annotation - a default
implementation for @RequiresTimeSortedInput has to take another DoFn as
input, and wire its lifecycle in a way that elements are buffered in
(dynamically created) buffer and fed into the downstream DoFn only when
timer fires.
If I narrow down my line of thinking, it would be possible to:
a) create something like "dynamic pipeline expansion", which would
make it possible work with PTransforms in this way (probably would
require some ByteBuddy magic)
b) wire this up to DoFnInvoker, which takes DoFn and creates class
that is used by runners for feeding data
Option b) would ensure that actually all runners support such expansion,
but seems to be somewhat hacky and too specific to this case. Moreover,
it would require knowledge if the expansion is actually required by the
runner (e.g. if the annotation is supported explicitly - most likely for
batch execution). Therefore I'd be in favor of option a), this might be
reusable by a broader range of default expansions.
In other SDKs than Java this might have different implications, the
reason why it is somewhat more complicated to do dynamic (or generic?)
expansions of PTransforms in Java is mostly due to how DoFns are
implemented in terms of annotations and the DoFnInvokers involved for
efficiency.
Jan
On 1/18/24 18:35, Robert Burke wrote:
I agree that variable support across Runners does limit the adoption of a
feature. But it's also then limited if the SDKs and their local / direct
runners don't yet support the feature. The Go SDK doesn't currently have a way
of specifying that annotation, preventing use. (The lack of mention of the
Python direct runner your list implies it's not yet supported by the Python
SDK, and a quick search shows that's likely [0])
While not yet widely available to the other SDKs, Prism, the new Go SDK Local
Runner, maintains data in event time sorted heaps [1]. The intent was to
implement the annotation (among other features) once I start running the Java
and Python Validates Runner suites against it.
I think stateful transforms are getting the event ordering on values for "free"
as a result [2], but there's no special/behavior at present if the DoFn is consuming the
result of a Group By Key.
Part of the issue is that by definition, a GBK "loses" the timestamps of the
values, and doesn't emit them, outside of using them to determine the resulting timestamp
of the Key... [3]. To make use of the timestamp in the aggregation stage a runner would
need to do something different in the GBK, namely sorting by the timestamp as the data is
ingested, and keeping that timestamp around to continue the sort. This prevents a more
efficient implementation of directly arranging the received element bytes into the
Iterator format, requiring a post process filtering. Not hard, but a little dissatisfying.
Skimming through the discussion, I agree with the general utility goal of the
annotation, but as with many Beam features, there may be a discoverability
problem. The feature isn't mentioned in the Programming Guide (AFAICT), and
trying to find anything on the beam site, the top result is the Javadoc for the
annotation (which is good, but you still need to know to look for it), and then
the next time related bit is OrderedListState which doesn't yet have a
meaningful portable representation last I checked [4], once again limiting
adoption.
Probably the most critical bit is, while we have broad "handling" of the annotation, I'm hard
pressed to say we even use the annotation outside of tests. A search [5] doesn't show any
"Transforms" or "IOs" making use of it with the only markdown/documentation about it
being the Beam 2.20.0 release notes saying it's now supported in Flink and Spark [6].
I will say, this isn't grounds for removing the feature, as I can only check what's in
the repo, and not what end users have, but it does indicate we didn't drive the feature
to completion and enable user adoption beyond "This Exists, and we can tell you
about it if you ask.".
AFAICT this is just one of those features we built, but then proceeded not to
use within Beam, and evangelize. This is a point we could certainly do better
on in Beam as a whole.
Robert Burke
Beam Go Busybody
[0]https://github.com/search?q=repo%3Aapache%2Fbeam+TIME_SORTED_INPUT+language%3APython&type=code
[1]https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L93
[2]https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L1094
[3]https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1132
[4]https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+OrderedListState
[5]https://github.com/search?q=repo%3Aapache%2Fbeam+RequiresTimeSortedInput&type=code&p=2
[6]https://github.com/apache/beam/blob/b4c23b32f2b80ce052c8a235e5064c69f37df992/website/www/site/content/en/blog/beam-2.20.0.md?plain=1#L46
On 2024/01/18 16:14:56 Jan Lukavský wrote:
Hi,
recently I came across the fact that most runners do not support
@RequiresTimeSortedInput annotation for sorting per-key data by event
timestamp [1]. Actually, runners supporting it seem to be Direct java,
Flink and Dataflow batch (as it is a noop there). The annotation has
use-cases in time-series data processing, in transaction processing and
more. Though it is absolutely possible to implement the time-sorting
manually (e.g. [2]), this is actually efficient only in streaming mode,
in batch mode the runner typically wants to leverage the internal
sort-grouping it already does.
The original idea was to implement this annotation inside
StatefulDoFnRunner, which would be used by majority of runners. It turns
out that this is not the case. The question now is, should we use an
alternative place to implement the annotation (e.g. Pipeline expansion,
or DoFnInvoker) so that more runners can benefit from it automatically
(at least for streaming case, batch case needs to be implemented
manually)? Do the community find the annotation useful? I'm linking a
rather old (and long :)) thread that preceded introduction of the
annotation [3] for more context.
I sense the current adoption of the annotation by runners makes it
somewhat use-less.
Looking forward to any comments on this.
Best,
Jan
[1]
https://beam.apache.org/releases/javadoc/2.53.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html
[2]
https://cloud.google.com/spanner/docs/change-streams/use-dataflow#order-by-key
[3]https://lists.apache.org/thread/bkl9kk8l44xw2sw08s7m54k1wsc3n4tn