Inline

On Sat, Jan 20, 2024, 9:15 AM Jan Lukavský <je...@seznam.cz> wrote:

> On 1/19/24 22:49, Robert Bradshaw via dev wrote:
>
> I think this standard design could still be made to work.
> Specifically, the graph would contain a DoFn that has the
> RequiresTimeSortedInput bit set, and as a single "subtransform" that
> has a different DoFn in its spec that does not require this bit to be
> set and whose implementation enforces this ordering (say, via state)
> before invoking the user's DoFn. This would work fine in Streaming for
> any runner, and would work OK for batch as long as the value set for
> any key fit into memory (or the batch state implementation spilled to
> disk, though that could get really slow). Runners that wanted to do
> better (e.g. provide timestamp sorting as part of their batch
> grouping, or even internally sort timestamps more efficiently than
> could be done via the SDK over the state API) could do so.
>
> Yes, this should work fine for portable runners outside Java.
>
> For Java, such a wrapper might be a bit messy, but could probably be
> hard coded above the ByteBuddy wrappers layer.
>
> +1
> Maybe we can delegate this to an (internal) annotation that would enable
> DoFns to define a subclass of DoFnInvokerFactory.
> E.g.
>
> class MyDoFn extends DoFn<> {
>
>   @DoFnInvokerFactory
>
>   MyDoFnInvokerFactory createDoFnInvokerFactory() {
>
>     ...
>
>   }
>
> }
>
> TBD how much of our infrastructure assumes ParDo transforms do not
> contain subtransforms. (We could also provide a different URN for
> RequresTimeSortedInput DoFns whose payload would be the DoFn payload,
> rather than setting a bit on the payload itself.) Rather than
> introducing nesting, we could implement the AnyOf PTransform that
> would present the two implementations as siblings (which could be
> useful elsewhere). This can be made backward compatible by providing
> one of the alternatives as the composite structure. The primary
> hesitation I have here is that it prevents much
> introspection/manipulation of the pipeline before the runner
> capabilities are know.
>
> What we really want is a way to annotate a DoFn as
> RequestsTimeSortedInput, together with a way for the runner to
> communicate to the SDK whether or not it was able to honor this
> request. That may be a more invasive change to the protocol (e.g.
> annotating PCollections with ordering properties, which is where it
> belongs[1]). I suppose we could let a runner that supports this
> capability strip the RequestsTimeSortedInput bit (or set a new bit),
> and SDKs that get unmutated transforms would know they have to do the
> sorting themselves.
>
> That sounds more like runners that are able to provide the sorting
> themselves would have a manual override for the sorted ParDo (be it via a
> bit or specific URN), no?
>

While it offhand feels like be a backwards incompatible change, the
runner+sdk could have a pair of Capabilities: the SDK saying it has the
capability for the Backup/Alternative option, and the Runner it doesn't
have the capability for the annotation that it can report to workers

I don't like the explicit negative capability though. Better to have the
runner explicitly report a capability for sorted input so the SDK can avoid
repeating any sorting/buffering work, possibly as a "V2" of any existing
requirement or capabilities...

 Jan
>
> - Robert
>
> [1] Ordering is an under-defined concept in Beam, but if we're going
> to add it my take would be that to do it properly one would want
>
> (1) Annotations on PCollections indicating whether they're unordered
> or ordered (by a certain ordering criteria, in this case
> timestamp-within-key), which could be largely inferred by
> (2) Annotations on PTransforms indicating whether they're
> order-creating, order-preserving, or order-requiring (with the default
> being unspeciified=order-destroying), again parameterized by an
> ordering criteria of some kind, which criteria could for a hierarchy.
>
>
> On Fri, Jan 19, 2024 at 10:40 AM Kenneth Knowles <k...@apache.org> 
> <k...@apache.org> wrote:
>
> In this design space, what we have done in the past is:
>
> 1) ensure that runners all reject pipelines they cannot run correctly
> 2) if there is a default/workaround/slower implementation, provide it as an 
> override
>
> This is largely ignoring portability but I think/hope it will still work. At 
> one time I put some effort into ensuring Java Pipeline objects and proto 
> representations could roundtrip with all the necessary information for 
> pre-portability runners to still work, which is the same prereqs as 
> pre-portable "Override" implementations to still work.
>
> TBH I'm 50/50 on the idea. If something is going to be implemented more 
> slowly or less scalably as a fallback, I think it may be best to simply be 
> upfront about being unable to really run it. It would depend on the 
> situation. For requiring time sorted input, the manual implementation is 
> probably similar to what a streaming runner might do, so it might make sense.
>
> Kenn
>
> On Fri, Jan 19, 2024 at 11:05 AM Robert Burke <rob...@frantil.com> 
> <rob...@frantil.com> wrote:
>
> I certainly don't have the deeper java insight here. So one more portable 
> based reply and then I'll step back on the Java specifics.
>
> Portable runners only really have the "unknown Composite" fallback option, 
> where if the Composite's URN isn't known to the runner, it should use the 
> subgraph that is being wrapped.
>
> I suppose the protocol could be expanded : If a composite transform with a 
> ParDo payload, and urn has features the runner can't handle, then it could 
> use the fallback graph as well.
>
> The SDK would have then still needed to have construct the fallback graph 
> into the Pipeline proto. This doesn't sound incompatible with what you've 
> suggested the Java SDK could do, but it avoids the runner needing to be aware 
> of a specific implementation requirement around a feature it doesn't support. 
>  If it has to do something specific to support an SDK specific mechanism, 
> that's still supporting the feature, but I fear it's not a great road to 
> tread on for runners to add SDK specific implementation details.
>
> If a (portable) runner is going to spend work on doing something to handle 
> RequiresTimeSortedInput, it's probably easier to handle it generally than to 
> try to enable a Java specific work around. I'm not even sure how that could 
> work since the SDK would then need a special interpretation of what a runner 
> sent back for it to do any SDK side special backup handling, vs the simple 
> execution of the given transform.
>
> It's entirely possible I've over simplified the "fallback" protocol described 
> above, so this thread is still useful for my Prism work, especially if I see 
> any similar situations once I start on the Java Validates Runner suite.
>
> Robert Burke
> Beam Go Busybody
>
> On Fri, Jan 19, 2024, 6:41 AM Jan Lukavský <je...@seznam.cz> 
> <je...@seznam.cz> wrote:
>
> I was primarily focused on Java SDK (and core-contruction-java), but 
> generally speaking, any SDK can provide default expansion that runners can 
> use so that it is not (should not be) required to implement this manually.
> Currently, in Java SDK, the annotation is wired up into StatefulDoFnRunner, 
> which (as name suggests) can be used for running stateful DoFns. The problem 
> is that not every runner is using this facility. Java SDK generally supports 
> providing default expansions of transforms, but _only for transforms that do 
> not have to work with dynamic state_. This is not the case for this 
> annotation - a default implementation for @RequiresTimeSortedInput has to 
> take another DoFn as input, and wire its lifecycle in a way that elements are 
> buffered in (dynamically created) buffer and fed into the downstream DoFn 
> only when timer fires.
>
> If I narrow down my line of thinking, it would be possible to:
>  a) create something like "dynamic pipeline expansion", which would make it 
> possible work with PTransforms in this way (probably would require some 
> ByteBuddy magic)
>  b) wire this up to DoFnInvoker, which takes DoFn and creates class that is 
> used by runners for feeding data
>
> Option b) would ensure that actually all runners support such expansion, but 
> seems to be somewhat hacky and too specific to this case. Moreover, it would 
> require knowledge if the expansion is actually required by the runner (e.g. 
> if the annotation is supported explicitly - most likely for batch execution). 
> Therefore I'd be in favor of option a), this might be reusable by a broader 
> range of default expansions.
>
> In other SDKs than Java this might have different implications, the reason 
> why it is somewhat more complicated to do dynamic (or generic?) expansions of 
> PTransforms in Java is mostly due to how DoFns are implemented in terms of 
> annotations and the DoFnInvokers involved for efficiency.
>
>  Jan
>
> On 1/18/24 18:35, Robert Burke wrote:
>
> I agree that variable support across Runners does limit the adoption of a 
> feature.  But it's also then limited if the SDKs and their local / direct 
> runners don't yet support the feature. The Go SDK doesn't currently have a 
> way of specifying that annotation, preventing use.  (The lack of mention of 
> the Python direct runner your list implies it's not yet supported by the 
> Python SDK, and a quick search shows that's likely [0])
>
> While not yet widely available to the other SDKs, Prism, the new Go SDK Local 
> Runner, maintains data in event time sorted heaps [1]. The intent was to 
> implement the annotation (among other features) once I start running the Java 
> and Python Validates Runner suites against it.
>
> I think stateful transforms are getting the event ordering on values for 
> "free" as a result [2], but there's no special/behavior at present if the 
> DoFn is consuming the result of a Group By Key.
>
> Part of the issue is that by definition, a GBK "loses" the timestamps of the 
> values, and doesn't emit them, outside of using them to determine the 
> resulting timestamp of the Key... [3]. To make use of the timestamp in the 
> aggregation stage a runner would need to do something different in the GBK, 
> namely sorting by the timestamp as the data is ingested, and keeping that 
> timestamp around to continue the sort. This prevents a more efficient 
> implementation of directly arranging the received element bytes into the 
> Iterator format, requiring a post process filtering. Not hard, but a little 
> dissatisfying.
>
> Skimming through the discussion, I agree with the general utility goal of the 
> annotation, but as with many Beam features, there may be a discoverability 
> problem. The feature isn't mentioned in the Programming Guide (AFAICT), and 
> trying to find anything on the beam site, the top result is the Javadoc for 
> the annotation (which is good, but you still need to know to look for it), 
> and then the next time related bit is OrderedListState which doesn't yet have 
> a meaningful portable representation last I checked [4], once again limiting 
> adoption.
>
> Probably the most critical bit is, while we have broad "handling" of the 
> annotation, I'm hard pressed to say we even use the annotation outside of 
> tests. A search [5] doesn't show any "Transforms" or "IOs" making use of it 
> with the only markdown/documentation about it being the Beam 2.20.0 release 
> notes saying it's now supported in Flink and Spark [6].
>
> I will say, this isn't grounds for removing the feature, as I can only check 
> what's in the repo, and not what end users have, but it does indicate we 
> didn't drive the feature to completion and enable user adoption beyond "This 
> Exists, and we can tell you about it if you ask.".
>
> AFAICT this is just one of those features we built, but then proceeded not to 
> use within Beam, and evangelize. This is a point we could certainly do better 
> on in Beam as a whole.
>
> Robert Burke
> Beam Go Busybody
>
> [0]  
> https://github.com/search?q=repo%3Aapache%2Fbeam+TIME_SORTED_INPUT+language%3APython&type=code
>
> [1] 
> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L93
>
> [2] 
> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go#L1094
>
> [3] 
> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1132
>
> [4] 
> https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+OrderedListState
>
> [5] 
> https://github.com/search?q=repo%3Aapache%2Fbeam+RequiresTimeSortedInput&type=code&p=2
>
> [6] 
> https://github.com/apache/beam/blob/b4c23b32f2b80ce052c8a235e5064c69f37df992/website/www/site/content/en/blog/beam-2.20.0.md?plain=1#L46
>
> On 2024/01/18 16:14:56 Jan Lukavský wrote:
>
> Hi,
>
> recently I came across the fact that most runners do not support
> @RequiresTimeSortedInput annotation for sorting per-key data by event
> timestamp [1]. Actually, runners supporting it seem to be Direct java,
> Flink and Dataflow batch (as it is a noop there). The annotation has
> use-cases in time-series data processing, in transaction processing and
> more. Though it is absolutely possible to implement the time-sorting
> manually (e.g. [2]), this is actually efficient only in streaming mode,
> in batch mode the runner typically wants to leverage the internal
> sort-grouping it already does.
>
> The original idea was to implement this annotation inside
> StatefulDoFnRunner, which would be used by majority of runners. It turns
> out that this is not the case. The question now is, should we use an
> alternative place to implement the annotation (e.g. Pipeline expansion,
> or DoFnInvoker) so that more runners can benefit from it automatically
> (at least for streaming case, batch case needs to be implemented
> manually)? Do the community find the annotation useful? I'm linking a
> rather old (and long :)) thread that preceded introduction of the
> annotation [3] for more context.
>
> I sense the current adoption of the annotation by runners makes it
> somewhat use-less.
>
> Looking forward to any comments on this.
>
> Best,
>
>   Jan
>
> [1]https://beam.apache.org/releases/javadoc/2.53.0/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html
>
> [2]https://cloud.google.com/spanner/docs/change-streams/use-dataflow#order-by-key
>
> [3] https://lists.apache.org/thread/bkl9kk8l44xw2sw08s7m54k1wsc3n4tn
>
>

Reply via email to