I created https://github.com/apache/beam/pull/10873 to add the minimal set of fields to the existing protos, and also created https://docs.google.com/document/d/1CyVElQDYHBRfXu6k1VSXv3Yok_4r8c4V0bkh2nFAWYc/edit to start enumerating some of the URNs we may want to have. It would be a good milestone to get this in by the release next week.
On Thu, Feb 13, 2020 at 5:14 PM Kyle Weaver <[email protected]> wrote: > > > we can take advantage of these pipeline features to get rid of the > > categories of @ValidatesRunner tests, because we could have just simply > > @ValidatesRunner and each test would be matched against runner capabilities > > +1, I think the potential to formally integrate our idea of compatibility and > unit testing is a big advantage of this proposal. Also, when deciding where > to draw lines between different URNs, it may help to look at the existing > validates runner test categories, which are currently the most accurate > signal we have regarding a runner's capabilities. > > On Thu, Feb 13, 2020 at 4:04 PM Robert Burke <[email protected]> wrote: >> >> Wrt per DoFn/ParDo level, there's the similar case of wether the DoFn has an >> Urn for requiring something or it's an annotation for saying the DoFn >> provides something (eg. Provides K-anonymization with k defined) >> >> The general theme of this thread seems to be trying to ensure a runner can >> reject a pipeline if it's not able to provide the right guarantees, so that >> latter case isn't handled. >> >> Eg. The latter provisions could be used to analyze a pipeline to ensure the >> outputs are all properly anonymized to a certain degree at construction time. >> >> On Thu, Feb 13, 2020, 2:26 PM Kenneth Knowles <[email protected]> wrote: >>> >>> >>> >>> On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský <[email protected]> wrote: >>>> >>>> Hi, >>>> >>>> +1 for adding pipeline required features. I think being able to reject >>>> pipeline with unknown requirement is pretty much needed, mostly because >>>> that enables runners to completely decouple from SDKs, while being able to >>>> recognize when a pipeline constructed with incomplatible version of SDK is >>>> run. >>>> >>>> I'll add some observations I made when implementing the latest "requires >>>> time sorted input" addition with regards to this discussion: >>>> >>>> a) the features of pipeline are not simple function of set of PTransforms >>>> being present in the pipeline, but also depend on (type of) inputs. For >>>> instance a PTransform might have a simple expansion to primitive >>>> PTransforms in streaming case, but don't have such expansion in batch >>>> case. That is to say, runner that doesn't actually know of a specific >>>> extension to some PTransform _might_ actually execute it correctly under >>>> some conditions. But _must_ fail in other cases. >>>> >>>> b) it would be good if this feature would work independently of >>>> portability (for Java SDK). We still have (at least two) non-portable >>>> runners that are IMO widely used in production and are likely to last for >>>> some time. >>> >>> I think even if these runners keep their execution not using portability, >>> they should migrate to use the portable pipeline definition. Then they can >>> share the same model w/ runners that execute using portability. The Fn API >>> is not required to be used as long as the runner implements the semantics >>> of the pipeline. >>> >>> Kenn >>> >>>> >>>> c) we can take advantage of these pipeline features to get rid of the >>>> categories of @ValidatesRunner tests, because we could have just simply >>>> @ValidatesRunner and each test would be matched against runner >>>> capabilities (i.e. a runner would be tested with given test if and only if >>>> it would not reject it) >>>> >>>> Jan >>>> >>>> On 2/13/20 8:42 PM, Robert Burke wrote: >>>> >>>> +1 to deferring for now. Since they should not be modified after adoption, >>>> it makes sense not to get ahead of ourselves. >>>> >>>> On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <[email protected]> wrote: >>>>> >>>>> On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <[email protected]> wrote: >>>>> > >>>>> > One thing that doesn't appear to have been suggested yet is we could >>>>> > "batch" urns together under a "super urn" so that adding one super urn >>>>> > is like adding each of the represented batch of features. This prevents >>>>> > needing to send dozens of urns to be individually sent over. >>>>> > >>>>> > >>>>> > The super urns would need to be static after definition to avoid >>>>> > mismatched definitions down the road. >>>>> > >>>>> > We collect together urns what is reasonably consider "vX" support, and >>>>> > can then increment that later. >>>>> > >>>>> > This would simplify new SDKs, as they can have a goal of initial v1 >>>>> > support as we define what level of feature support it has, and doesn't >>>>> > prevent new capabilities from being added incrementally. >>>>> >>>>> Yes, this is a very good idea. I've also been thinking of certain sets >>>>> of common operations/well known DoFns that often occur on opposite >>>>> sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that >>>>> are commonly supported that could be grouped under these meta-urns. >>>>> >>>>> Note that these need not be monotonic, for example a current v1 might >>>>> be requiring LengthPrefixCoderV1, but if a more efficient >>>>> LengthPrefixCoderV2 comes along eventually v2 could require that and >>>>> *not* require the old, now rarely used LengthPrefixCoderV1. >>>>> >>>>> Probably makes sense to defer adding such super-urns until we notice a >>>>> set that is commonly used together in practice. >>>>> >>>>> Of course there's still value in SDKs being able to support features >>>>> piecemeal as well, which is the big reason we're avoiding a simple >>>>> monotonically-increasing version number. >>>>> >>>>> > Similarly, certain features sets could stand alone, eg around SQL. It's >>>>> > benefitial for optimization reasons if an SDK has native projection and >>>>> > UDF support for example, which a runner could take advantage of by >>>>> > avoiding extra cross language hops. These could then also be grouped >>>>> > under a SQL super urn. >>>>> > >>>>> > This is from the SDK capability side of course, rather than the SDK >>>>> > pipeline requirements side. >>>>> > >>>>> > ------- >>>>> > Related to that last point, it might be good to nail down early the >>>>> > perspective used when discussing these things, as there's a dual >>>>> > between "what and SDK can do", and "what the runner will do to a >>>>> > pipeline that the SDK can understand" (eg. Combiner lifting, and state >>>>> > backed iterables), as well as "what the pipeline requires from the >>>>> > runner" and "what the runner is able to do" (eg. Requires sorted input) >>>>> > >>>>> > >>>>> > On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <[email protected]> wrote: >>>>> >> >>>>> >> >>>>> >> >>>>> >> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <[email protected]> >>>>> >> wrote: >>>>> >>> >>>>> >>> >>>>> >>> >>>>> >>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw >>>>> >>> <[email protected]> wrote: >>>>> >>>> >>>>> >>>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <[email protected]> wrote: >>>>> >>>> > >>>>> >>>> > We can always detect on the runner/SDK side whether there is an >>>>> >>>> > unknown field[1] within a payload and fail to process it but this >>>>> >>>> > is painful in two situations: >>>>> >>>> > 1) It doesn't provide for a good error message since you can't say >>>>> >>>> > what the purpose of the field is. With a capability URN, the >>>>> >>>> > runner/SDK could say which URN it doesn't understand. >>>>> >>>> > 2) It doesn't allow for the addition of fields which don't impact >>>>> >>>> > semantics of execution. For example, if the display data feature >>>>> >>>> > was being developed, a runner could ignore it and still execute >>>>> >>>> > the pipeline correctly. >>>>> >>>> >>>>> >>>> Yeah, I don't think proto reflection is a flexible enough tool to do >>>>> >>>> this well either. >>>>> >>>> >>>>> >>>> > If we think this to be common enough, we can add capabilities list >>>>> >>>> > to the PTransform so each PTransform can do this and has a natural >>>>> >>>> > way of being extended for additions which are forwards compatible. >>>>> >>>> > The alternative to having capabilities on PTransform (and other >>>>> >>>> > constructs) is that we would have a new URN when the specification >>>>> >>>> > of the transform changes. For forwards compatible changes, each >>>>> >>>> > SDK/runner would map older versions of the URN onto the latest and >>>>> >>>> > internally treat it as the latest version but always downgrade it >>>>> >>>> > to the version the other party expects when communicating with it. >>>>> >>>> > Backwards incompatible changes would always require a new URN >>>>> >>>> > which capabilities at the PTransform level would not help with. >>>>> >>>> >>>>> >>>> As you point out, stateful+splittable may not be a particularly >>>>> >>>> useful >>>>> >>>> combination, but as another example, we have >>>>> >>>> (backwards-incompatible-when-introduced) markers on DoFn as to >>>>> >>>> whether >>>>> >>>> it requires finalization, stable inputs, and now time sorting. I >>>>> >>>> don't >>>>> >>>> think we should have a new URN for each combination. >>>>> >>> >>>>> >>> >>>>> >>> Agree with this. I don't think stateful, splittable, and "plain" >>>>> >>> ParDo are comparable to these. Each is an entirely different >>>>> >>> computational paradigm: per-element independent processing, >>>>> >>> per-key-and-window linear processing, and per-element-and-restriction >>>>> >>> splittable processing. Most relevant IMO is the nature of the >>>>> >>> parallelism. If you added state to splittable processing, it would >>>>> >>> still be splittable processing. Just as Combine and ParDo can share >>>>> >>> the SideInput specification, it is easy to share relevant >>>>> >>> sub-structures like state declarations. But it is a fair point that >>>>> >>> the ability to split can be ignored and run as a plain-old ParDo. It >>>>> >>> brings up the question of whether a runner that doesn't know SDF is >>>>> >>> should have to reject it or should be allowed to run poorly. >>>>> >> >>>>> >> >>>>> >> Being splittable means that the SDK could choose to return a >>>>> >> continuation saying please process the rest of my element in X amount >>>>> >> of time which would require the runner to inspect certain fields on >>>>> >> responses. One example would be I don't have many more messages to >>>>> >> read from this message stream at the moment and another example could >>>>> >> be that I detected that this filesystem is throttling me or is down >>>>> >> and I would like to resume processing later. >>>>> >> >>>>> >>> >>>>> >>> It isn't a huge deal. Three different top-level URNS versus three >>>>> >>> different sub-URNs will achieve the same result in the end if we get >>>>> >>> this "capability" thing in place. >>>>> >>> >>>>> >>> Kenn >>>>> >>> >>>>> >>>> >>>>> >>>> >>>>> >>>> >> > I do think that splittable ParDo and stateful ParDo should have >>>>> >>>> >> > separate PTransform URNs since they are different paradigms >>>>> >>>> >> > than "vanilla" ParDo. >>>>> >>>> >> >>>>> >>>> >> Here I disagree. What about one that is both splittable and >>>>> >>>> >> stateful? Would one have a fourth URN for that? If/when another >>>>> >>>> >> flavor of DoFn comes out, would we then want 8 distinct URNs? >>>>> >>>> >> (SplitableParDo in particular can be executed as a normal ParDo >>>>> >>>> >> as long as the output is bounded.) >>>>> >>>> > >>>>> >>>> > I agree that you could have stateful and splittable dofns where >>>>> >>>> > the element is the key and you share state and timers across >>>>> >>>> > restrictions. No runner is capable of executing this efficiently. >>>>> >>>> > >>>>> >>>> >> >> > On the SDK requirements side: the constructing SDK owns the >>>>> >>>> >> >> > Environment proto completely, so it is in a position to >>>>> >>>> >> >> > ensure the involved docker images support the necessary >>>>> >>>> >> >> > features. >>>>> >>>> >> >> >>>>> >>>> >> >> Yes. >>>>> >>>> > >>>>> >>>> > >>>>> >>>> > I believe capabilities do exist on a Pipeline and it informs >>>>> >>>> > runners about new types of fields to be aware of either within >>>>> >>>> > Components or on the Pipeline object itself but for this >>>>> >>>> > discussion it makes sense that an environment would store most >>>>> >>>> > "capabilities" related to execution. >>>>> >>>> > >>>>> >>>> >> [snip] >>>>> >>>> > >>>>> >>>> > As for the proto clean-ups, the scope is to cover almost all >>>>> >>>> > things needed for execution now and to follow-up with optional >>>>> >>>> > transforms, payloads, and coders later which would exclude job >>>>> >>>> > managment APIs and artifact staging. A formal enumeration would be >>>>> >>>> > useful here. Also, we should provide formal guidance about adding >>>>> >>>> > new fields, adding new types of transforms, new types of proto >>>>> >>>> > messages, ... (best to describe this on a case by case basis as to >>>>> >>>> > how people are trying to modify the protos and evolve this >>>>> >>>> > guidance over time). >>>>> >>>> >>>>> >>>> What we need is the ability for (1) runners to reject future >>>>> >>>> pipelines >>>>> >>>> they cannot faithfully execute and (2) runners to be able to take >>>>> >>>> advantage of advanced features/protocols when interacting with those >>>>> >>>> SDKs that understand them while avoiding them for older (or newer) >>>>> >>>> SDKs that don't. Let's call (1) (hard) requirements and (2) >>>>> >>>> (optional) >>>>> >>>> capabilities. >>>>> >>>> >>>>> >>>> Where possible, I think this is best expressed inherently in the set >>>>> >>>> of transform (and possibly other component) URNs. For example, when >>>>> >>>> an >>>>> >>>> SDK uses a combine_per_key composite, that's a signal that it >>>>> >>>> understands the various related combine_* transforms. Similarly, a >>>>> >>>> pipeline with a test_stream URN would be rejected by pipelines not >>>>> >>>> recognizing/supporting this primitive. However, this is not always >>>>> >>>> possible, e.g. for (1) we have the aforementioned boolean flags on >>>>> >>>> ParDo and for (2) we have features like large iterable and progress >>>>> >>>> support. >>>>> >>>> >>>>> >>>> For (1) we have to enumerate now everywhere a runner must look a far >>>>> >>>> into the future as we want to remain backwards compatible. This is >>>>> >>>> why >>>>> >>>> I suggested putting something on the pipeline itself, but we could >>>>> >>>> (likely in addition) add it to Transform and/or ParDoPayload if we >>>>> >>>> think that'd be useful now. (Note that a future pipeline-level >>>>> >>>> requirement could be "inspect (previously non-existent) requirements >>>>> >>>> field attached to objects of type X.") >>>>> >>>> >>>>> >>>> For (2) I think adding a capabilities field to the environment for >>>>> >>>> now >>>>> >>>> makes the most sense, and as it's optional to inspect them adding it >>>>> >>>> elsewhere if needed is backwards compatible. (The motivation to do it >>>>> >>>> now is that there are some capabilities that we'd like to enumerate >>>>> >>>> now rather than make part of the minimal set of things an SDK must >>>>> >>>> support.) >>>>> >>>> >>>>> >> >>>>> >> Agree on the separation of requirements from capabilities where >>>>> >> requirements is a set of MUST understand while capabilities are a set >>>>> >> of MAY understand. >>>>> >> >>>>> >>>> >>>>> >>>> > All in all, I think "capabilities" is about informing a runner >>>>> >>>> > about what they should know about and what they are allowed to do. >>>>> >>>> > If we go with a list of "capabilities", we could always add a >>>>> >>>> > "parameterized capabilities" urn which would tell runners they >>>>> >>>> > need to also look at some other field. >>>>> >>>> >>>>> >>>> Good point. That lets us keep it as a list for now. (The risk is that >>>>> >>>> it makes possible the bug of populating parameters without adding the >>>>> >>>> required notification to the list.) >>>>> >>>> >>>>> >>>> > I also believe capabilities should NOT be "inherited". For example >>>>> >>>> > if we define capabilities on a ParDoPayload, and on a PTransform >>>>> >>>> > and on Environment, then ParDoPayload capabilities shouldn't be >>>>> >>>> > copied to PTransform and PTransform specific capabilities >>>>> >>>> > shouldn't be copied to the Environment. My reasoning about this is >>>>> >>>> > that some "capabilities" can only be scoped to a single >>>>> >>>> > ParDoPayload or a single PTransform and wouldn't apply generally >>>>> >>>> > everywhere. The best example I could think of is that Environment >>>>> >>>> > A supports progress reporting while Environment B doesn't so it >>>>> >>>> > wouldn't have made sense to say the "Pipeline" supports progress >>>>> >>>> > reporting. >>>>> >>>> > >>>>> >>>> > Are capabilities strictly different from "resources" (transform >>>>> >>>> > needs python package X) or "execution hints" (e.g. deploy on >>>>> >>>> > machines that have GPUs, some generic but mostly runner specific >>>>> >>>> > hints)? At first glance I would say yes. >>>>> >>>> >>>>> >>>> Agreed.
