Inline On Sat, Jan 20, 2024, 9:15 AM Jan Lukavský <je...@seznam.cz> wrote:
> On 1/19/24 22:49, Robert Bradshaw via dev wrote: > > I think this standard design could still be made to work. > Specifically, the graph would contain a DoFn that has the > RequiresTimeSortedInput bit set, and as a single "subtransform" that > has a different DoFn in its spec that does not require this bit to be > set and whose implementation enforces this ordering (say, via state) > before invoking the user's DoFn. This would work fine in Streaming for > any runner, and would work OK for batch as long as the value set for > any key fit into memory (or the batch state implementation spilled to > disk, though that could get really slow). Runners that wanted to do > better (e.g. provide timestamp sorting as part of their batch > grouping, or even internally sort timestamps more efficiently than > could be done via the SDK over the state API) could do so. > > Yes, this should work fine for portable runners outside Java. > > For Java, such a wrapper might be a bit messy, but could probably be > hard coded above the ByteBuddy wrappers layer. > > +1 > Maybe we can delegate this to an (internal) annotation that would enable > DoFns to define a subclass of DoFnInvokerFactory. > E.g. > > class MyDoFn extends DoFn<> { > > @DoFnInvokerFactory > > MyDoFnInvokerFactory createDoFnInvokerFactory() { > > ... > > } > > } > > TBD how much of our infrastructure assumes ParDo transforms do not > contain subtransforms. (We could also provide a different URN for > RequresTimeSortedInput DoFns whose payload would be the DoFn payload, > rather than setting a bit on the payload itself.) Rather than > introducing nesting, we could implement the AnyOf PTransform that > would present the two implementations as siblings (which could be > useful elsewhere). This can be made backward compatible by providing > one of the alternatives as the composite structure. The primary > hesitation I have here is that it prevents much > introspection/manipulation of the pipeline before the runner > capabilities are know. > > What we really want is a way to annotate a DoFn as > RequestsTimeSortedInput, together with a way for the runner to > communicate to the SDK whether or not it was able to honor this > request. That may be a more invasive change to the protocol (e.g. > annotating PCollections with ordering properties, which is where it > belongs[1]). I suppose we could let a runner that supports this > capability strip the RequestsTimeSortedInput bit (or set a new bit), > and SDKs that get unmutated transforms would know they have to do the > sorting themselves. > > That sounds more like runners that are able to provide the sorting > themselves would have a manual override for the sorted ParDo (be it via a > bit or specific URN), no? > While it offhand feels like be a backwards incompatible change, the runner+sdk could have a pair of Capabilities: the SDK saying it has the capability for the Backup/Alternative option, and the Runner it doesn't have the capability for the annotation that it can report to workers I don't like the explicit negative capability though. Better to have the runner explicitly report a capability for sorted input so the SDK can avoid repeating any sorting/buffering work, possibly as a "V2" of any existing requirement or capabilities... Jan > > - Robert > > [1] Ordering is an under-defined concept in Beam, but if we're going > to add it my take would be that to do it properly one would want > > (1) Annotations on PCollections indicating whether they're unordered > or ordered (by a certain ordering criteria, in this case > timestamp-within-key), which could be largely inferred by > (2) Annotations on PTransforms indicating whether they're > order-creating, order-preserving, or order-requiring (with the default > being unspeciified=order-destroying), again parameterized by an > ordering criteria of some kind, which criteria could for a hierarchy. > > > On Fri, Jan 19, 2024 at 10:40 AM Kenneth Knowles <k...@apache.org> > <k...@apache.org> wrote: > > In this design space, what we have done in the past is: > > 1) ensure that runners all reject pipelines they cannot run correctly > 2) if there is a default/workaround/slower implementation, provide it as an > override > > This is largely ignoring portability but I think/hope it will still work. At > one time I put some effort into ensuring Java Pipeline objects and proto > representations could roundtrip with all the necessary information for > pre-portability runners to still work, which is the same prereqs as > pre-portable "Override" implementations to still work. > > TBH I'm 50/50 on the idea. If something is going to be implemented more > slowly or less scalably as a fallback, I think it may be best to simply be > upfront about being unable to really run it. It would depend on the > situation. For requiring time sorted input, the manual implementation is > probably similar to what a streaming runner might do, so it might make sense. > > Kenn > > On Fri, Jan 19, 2024 at 11:05 AM Robert Burke <rob...@frantil.com> > <rob...@frantil.com> wrote: > > I certainly don't have the deeper java insight here. So one more portable > based reply and then I'll step back on the Java specifics. > > Portable runners only really have the "unknown Composite" fallback option, > where if the Composite's URN isn't known to the runner, it should use the > subgraph that is being wrapped. > > I suppose the protocol could be expanded : If a composite transform with a > ParDo payload, and urn has features the runner can't handle, then it could > use the fallback graph as well. > > The SDK would have then still needed to have construct the fallback graph > into the Pipeline proto. This doesn't sound incompatible with what you've > suggested the Java SDK could do, but it avoids the runner needing to be aware > of a specific implementation requirement around a feature it doesn't support. > If it has to do something specific to support an SDK specific mechanism, > that's still supporting the feature, but I fear it's not a great road to > tread on for runners to add SDK specific implementation details. > > If a (portable) runner is going to spend work on doing something to handle > RequiresTimeSortedInput, it's probably easier to handle it generally than to > try to enable a Java specific work around. I'm not even sure how that could > work since the SDK would then need a special interpretation of what a runner > sent back for it to do any SDK side special backup handling, vs the simple > execution of the given transform. > > It's entirely possible I've over simplified the "fallback" protocol described > above, so this thread is still useful for my Prism work, especially if I see > any similar situations once I start on the Java Validates Runner suite. > > Robert Burke > Beam Go Busybody > > On Fri, Jan 19, 2024, 6:41 AM Jan Lukavský <je...@seznam.cz> > <je...@seznam.cz> wrote: > > I was primarily focused on Java SDK (and core-contruction-java), but > generally speaking, any SDK can provide default expansion that runners can > use so that it is not (should not be) required to implement this manually. > Currently, in Java SDK, the annotation is wired up into StatefulDoFnRunner, > which (as name suggests) can be used for running stateful DoFns. The problem > is that not every runner is using this facility. Java SDK generally supports > providing default expansions of transforms, but _only for transforms that do > not have to work with dynamic state_. This is not the case for this > annotation - a default implementation for @RequiresTimeSortedInput has to > take another DoFn as input, and wire its lifecycle in a way that elements are > buffered in (dynamically created) buffer and fed into the downstream DoFn > only when timer fires. > > If I narrow down my line of thinking, it would be possible to: > a) create something like "dynamic pipeline expansion", which would make it > possible work with PTransforms in this way (probably would require some > ByteBuddy magic) > b) wire this up to DoFnInvoker, which takes DoFn and creates class that is > used by runners for feeding data > > Option b) would ensure that actually all runners support such expansion, but > seems to be somewhat hacky and too specific to this case. Moreover, it would > require knowledge if the expansion is actually required by the runner (e.g. > if the annotation is supported explicitly - most likely for batch execution). > Therefore I'd be in favor of option a), this might be reusable by a broader > range of default expansions. > > In other SDKs than Java this might have different implications, the reason > why it is somewhat more complicated to do dynamic (or generic?) expansions of > PTransforms in Java is mostly due to how DoFns are implemented in terms of > annotations and the DoFnInvokers involved for efficiency. > > Jan > > On 1/18/24 18:35, Robert Burke wrote: > > I agree that variable support across Runners does limit the adoption of a > feature. But it's also then limited if the SDKs and their local / direct > runners don't yet support the feature. The Go SDK doesn't currently have a > way of specifying that annotation, preventing use. (The lack of mention of > the Python direct runner your list implies it's not yet supported by the > Python SDK, and a quick search shows that's likely [0]) > > While not yet widely available to the other SDKs, Prism, the new Go SDK Local > Runner, maintains data in event time sorted heaps [1]. The intent was to > implement the annotation (among other features) once I start running the Java > and Python Validates Runner suites against it. > > I think stateful transforms are getting the event ordering on values for > "free" as a result [2], but there's no special/behavior at present if the > DoFn is consuming the result of a Group By Key. > > Part of the issue is that by definition, a GBK "loses" the timestamps of the > values, and doesn't emit them, outside of using them to determine the > resulting timestamp of the Key... [3]. To make use of the timestamp in the > aggregation stage a runner would need to do something different in the GBK, > namely sorting by the timestamp as the data is ingested, and keeping that > timestamp around to continue the sort. This prevents a more efficient > implementation of directly arranging the received element bytes into the > Iterator format, requiring a post process filtering. Not hard, but a little > dissatisfying. > > Skimming through the discussion, I agree with the general utility goal of the > annotation, but as with many Beam features, there may be a discoverability > problem. The feature isn't mentioned in the Programming Guide (AFAICT), and > trying to find anything on the beam site, the top result is the Javadoc for > the annotation (which is good, but you still need to know to look for it), > and then the next time related bit is OrderedListState which doesn't yet have > a meaningful portable representation last I checked [4], once again limiting > adoption. > > Probably the most critical bit is, while we have broad "handling" of the > annotation, I'm hard pressed to say we even use the annotation outside of > tests. A search [5] doesn't show any "Transforms" or "IOs" making use of it > with the only markdown/documentation about it being the Beam 2.20.0 release > notes saying it's now supported in Flink and Spark [6]. > > I will say, this isn't grounds for removing the feature, as I can only check > what's in the repo, and not what end users have, but it does indicate we > didn't drive the feature to completion and enable user adoption beyond "This > Exists, and we can tell you about it if you ask.". > > AFAICT this is just one of those features we built, but then proceeded not to > use within Beam, and evangelize. This is a point we could certainly do better > on in Beam as a whole. > > Robert Burke > Beam Go Busybody > > [0] > https://github.com/search?q=repo%3Aapache%2Fbeam+TIME_SORTED_INPUT+language%3APython&type=code > > [1] > https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L93 > > [2] > https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L1094 > > [3] > https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1132 > > [4] > https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+OrderedListState > > [5] > https://github.com/search?q=repo%3Aapache%2Fbeam+RequiresTimeSortedInput&type=code&p=2 > > [6] > https://github.com/apache/beam/blob/b4c23b32f2b80ce052c8a235e5064c69f37df992/website/www/site/content/en/blog/beam-2.20.0.md?plain=1#L46 > > On 2024/01/18 16:14:56 Jan Lukavský wrote: > > Hi, > > recently I came across the fact that most runners do not support > @RequiresTimeSortedInput annotation for sorting per-key data by event > timestamp [1]. Actually, runners supporting it seem to be Direct java, > Flink and Dataflow batch (as it is a noop there). The annotation has > use-cases in time-series data processing, in transaction processing and > more. Though it is absolutely possible to implement the time-sorting > manually (e.g. [2]), this is actually efficient only in streaming mode, > in batch mode the runner typically wants to leverage the internal > sort-grouping it already does. > > The original idea was to implement this annotation inside > StatefulDoFnRunner, which would be used by majority of runners. It turns > out that this is not the case. The question now is, should we use an > alternative place to implement the annotation (e.g. Pipeline expansion, > or DoFnInvoker) so that more runners can benefit from it automatically > (at least for streaming case, batch case needs to be implemented > manually)? Do the community find the annotation useful? I'm linking a > rather old (and long :)) thread that preceded introduction of the > annotation [3] for more context. > > I sense the current adoption of the annotation by runners makes it > somewhat use-less. > > Looking forward to any comments on this. > > Best, > > Jan > > [1]https://beam.apache.org/releases/javadoc/2.53.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html > > [2]https://cloud.google.com/spanner/docs/change-streams/use-dataflow#order-by-key > > [3] https://lists.apache.org/thread/bkl9kk8l44xw2sw08s7m54k1wsc3n4tn > >