I created https://github.com/apache/beam/pull/10873 to add the minimal
set of fields to the existing protos, and also created
https://docs.google.com/document/d/1CyVElQDYHBRfXu6k1VSXv3Yok_4r8c4V0bkh2nFAWYc/edit
to start enumerating some of the URNs we may want to have. It would be
a good milestone to get this in by the release next week.

On Thu, Feb 13, 2020 at 5:14 PM Kyle Weaver <[email protected]> wrote:
>
> > we can take advantage of these pipeline features to get rid of the 
> > categories of @ValidatesRunner tests, because we could have just simply 
> > @ValidatesRunner and each test would be matched against runner capabilities
>
> +1, I think the potential to formally integrate our idea of compatibility and 
> unit testing is a big advantage of this proposal. Also, when deciding where 
> to draw lines between different URNs, it may help to look at the existing 
> validates runner test categories, which are currently the most accurate 
> signal we have regarding a runner's capabilities.
>
> On Thu, Feb 13, 2020 at 4:04 PM Robert Burke <[email protected]> wrote:
>>
>> Wrt per DoFn/ParDo level, there's the similar case of wether the DoFn has an 
>> Urn for requiring something or it's an annotation for saying the DoFn 
>> provides something (eg. Provides K-anonymization with k defined)
>>
>> The general theme of this thread seems to be trying to ensure a runner can 
>> reject a pipeline if it's not able to provide the right guarantees, so that 
>> latter case isn't handled.
>>
>> Eg. The latter provisions could be used to analyze a pipeline to ensure the 
>> outputs are all properly anonymized to a certain degree at construction time.
>>
>> On Thu, Feb 13, 2020, 2:26 PM Kenneth Knowles <[email protected]> wrote:
>>>
>>>
>>>
>>> On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský <[email protected]> wrote:
>>>>
>>>> Hi,
>>>>
>>>> +1 for adding pipeline required features. I think being able to reject 
>>>> pipeline with unknown requirement is pretty much needed, mostly because 
>>>> that enables runners to completely decouple from SDKs, while being able to 
>>>> recognize when a pipeline constructed with incomplatible version of SDK is 
>>>> run.
>>>>
>>>> I'll add some observations I made when implementing the latest "requires 
>>>> time sorted input" addition with regards to this discussion:
>>>>
>>>>  a) the features of pipeline are not simple function of set of PTransforms 
>>>> being present in the pipeline, but also depend on (type of) inputs. For 
>>>> instance a PTransform might have a simple expansion to primitive 
>>>> PTransforms in streaming case, but don't have such expansion in batch 
>>>> case. That is to say, runner that doesn't actually know of a specific 
>>>> extension to some PTransform _might_ actually execute it correctly under 
>>>> some conditions. But _must_ fail in other cases.
>>>>
>>>>  b) it would be good if this feature would work independently of 
>>>> portability (for Java SDK). We still have (at least two) non-portable 
>>>> runners that are IMO widely used in production and are likely to last for 
>>>> some time.
>>>
>>> I think even if these runners keep their execution not using portability, 
>>> they should migrate to use the portable pipeline definition. Then they can 
>>> share the same model w/ runners that execute using portability. The Fn API 
>>> is not required to be used as long as the runner implements the semantics 
>>> of the pipeline.
>>>
>>> Kenn
>>>
>>>>
>>>>  c) we can take advantage of these pipeline features to get rid of the 
>>>> categories of @ValidatesRunner tests, because we could have just simply 
>>>> @ValidatesRunner and each test would be matched against runner 
>>>> capabilities (i.e. a runner would be tested with given test if and only if 
>>>> it would not reject it)
>>>>
>>>> Jan
>>>>
>>>> On 2/13/20 8:42 PM, Robert Burke wrote:
>>>>
>>>> +1 to deferring for now. Since they should not be modified after adoption, 
>>>> it makes sense not to get ahead of ourselves.
>>>>
>>>> On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <[email protected]> wrote:
>>>>>
>>>>> On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <[email protected]> wrote:
>>>>> >
>>>>> > One thing that doesn't appear to have been suggested yet is we could 
>>>>> > "batch" urns together under a "super urn" so that adding one super urn 
>>>>> > is like adding each of the represented batch of features. This prevents 
>>>>> > needing to send dozens of urns to be individually sent over.
>>>>> >
>>>>> >
>>>>> > The super urns would need to be static after definition to avoid 
>>>>> > mismatched definitions down the road.
>>>>> >
>>>>> > We collect together urns what is reasonably consider "vX" support, and 
>>>>> > can then increment that later.
>>>>> >
>>>>> > This would simplify new SDKs, as they can have a goal of initial v1 
>>>>> > support as we define what level of feature support it has, and doesn't 
>>>>> > prevent new capabilities from being added incrementally.
>>>>>
>>>>> Yes, this is a very good idea. I've also been thinking of certain sets
>>>>> of common operations/well known DoFns that often occur on opposite
>>>>> sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
>>>>> are commonly supported that could be grouped under these meta-urns.
>>>>>
>>>>> Note that these need not be monotonic, for example a current v1 might
>>>>> be requiring LengthPrefixCoderV1, but if a more efficient
>>>>> LengthPrefixCoderV2 comes along eventually v2 could require that and
>>>>> *not* require the old, now rarely used LengthPrefixCoderV1.
>>>>>
>>>>> Probably makes sense to defer adding such super-urns until we notice a
>>>>> set that is commonly used together in practice.
>>>>>
>>>>> Of course there's still value in SDKs being able to support features
>>>>> piecemeal as well, which is the big reason we're avoiding a simple
>>>>> monotonically-increasing version number.
>>>>>
>>>>> > Similarly, certain features sets could stand alone, eg around SQL. It's 
>>>>> > benefitial for optimization reasons if an SDK has native projection and 
>>>>> > UDF support for example, which a runner could take advantage of by 
>>>>> > avoiding extra cross language hops. These could then also be grouped 
>>>>> > under a SQL super urn.
>>>>> >
>>>>> > This is from the SDK capability side of course, rather than the SDK 
>>>>> > pipeline requirements side.
>>>>> >
>>>>> > -------
>>>>> > Related to that last point, it might be good to nail down early the 
>>>>> > perspective used when discussing these things, as there's a dual 
>>>>> > between "what and SDK can do", and "what the runner will do to a 
>>>>> > pipeline that the SDK can understand" (eg. Combiner lifting, and state 
>>>>> > backed iterables), as well as "what the pipeline requires from the 
>>>>> > runner" and "what the runner is able to do" (eg. Requires sorted input)
>>>>> >
>>>>> >
>>>>> > On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <[email protected]> wrote:
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <[email protected]> 
>>>>> >> wrote:
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw 
>>>>> >>> <[email protected]> wrote:
>>>>> >>>>
>>>>> >>>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <[email protected]> wrote:
>>>>> >>>> >
>>>>> >>>> > We can always detect on the runner/SDK side whether there is an 
>>>>> >>>> > unknown field[1] within a payload and fail to process it but this 
>>>>> >>>> > is painful in two situations:
>>>>> >>>> > 1) It doesn't provide for a good error message since you can't say 
>>>>> >>>> > what the purpose of the field is. With a capability URN, the 
>>>>> >>>> > runner/SDK could say which URN it doesn't understand.
>>>>> >>>> > 2) It doesn't allow for the addition of fields which don't impact 
>>>>> >>>> > semantics of execution. For example, if the display data feature 
>>>>> >>>> > was being developed, a runner could ignore it and still execute 
>>>>> >>>> > the pipeline correctly.
>>>>> >>>>
>>>>> >>>> Yeah, I don't think proto reflection is a flexible enough tool to do
>>>>> >>>> this well either.
>>>>> >>>>
>>>>> >>>> > If we think this to be common enough, we can add capabilities list 
>>>>> >>>> > to the PTransform so each PTransform can do this and has a natural 
>>>>> >>>> > way of being extended for additions which are forwards compatible. 
>>>>> >>>> > The alternative to having capabilities on PTransform (and other 
>>>>> >>>> > constructs) is that we would have a new URN when the specification 
>>>>> >>>> > of the transform changes. For forwards compatible changes, each 
>>>>> >>>> > SDK/runner would map older versions of the URN onto the latest and 
>>>>> >>>> > internally treat it as the latest version but always downgrade it 
>>>>> >>>> > to the version the other party expects when communicating with it. 
>>>>> >>>> > Backwards incompatible changes would always require a new URN 
>>>>> >>>> > which capabilities at the PTransform level would not help with.
>>>>> >>>>
>>>>> >>>> As you point out, stateful+splittable may not be a particularly 
>>>>> >>>> useful
>>>>> >>>> combination, but as another example, we have
>>>>> >>>> (backwards-incompatible-when-introduced) markers on DoFn as to 
>>>>> >>>> whether
>>>>> >>>> it requires finalization, stable inputs, and now time sorting. I 
>>>>> >>>> don't
>>>>> >>>> think we should have a new URN for each combination.
>>>>> >>>
>>>>> >>>
>>>>> >>> Agree with this. I don't think stateful, splittable, and "plain" 
>>>>> >>> ParDo are comparable to these. Each is an entirely different 
>>>>> >>> computational paradigm: per-element independent processing, 
>>>>> >>> per-key-and-window linear processing, and per-element-and-restriction 
>>>>> >>> splittable processing. Most relevant IMO is the nature of the 
>>>>> >>> parallelism. If you added state to splittable processing, it would 
>>>>> >>> still be splittable processing. Just as Combine and ParDo can share 
>>>>> >>> the SideInput specification, it is easy to share relevant 
>>>>> >>> sub-structures like state declarations. But it is a fair point that 
>>>>> >>> the ability to split can be ignored and run as a plain-old ParDo. It 
>>>>> >>> brings up the question of whether a runner that doesn't know SDF is 
>>>>> >>> should have to reject it or should be allowed to run poorly.
>>>>> >>
>>>>> >>
>>>>> >> Being splittable means that the SDK could choose to return a 
>>>>> >> continuation saying please process the rest of my element in X amount 
>>>>> >> of time which would require the runner to inspect certain fields on 
>>>>> >> responses. One example would be I don't have many more messages to 
>>>>> >> read from this message stream at the moment and another example could 
>>>>> >> be that I detected that this filesystem is throttling me or is down 
>>>>> >> and I would like to resume processing later.
>>>>> >>
>>>>> >>>
>>>>> >>> It isn't a huge deal. Three different top-level URNS versus three 
>>>>> >>> different sub-URNs will achieve the same result in the end if we get 
>>>>> >>> this "capability" thing in place.
>>>>> >>>
>>>>> >>> Kenn
>>>>> >>>
>>>>> >>>>
>>>>> >>>>
>>>>> >>>> >> > I do think that splittable ParDo and stateful ParDo should have 
>>>>> >>>> >> > separate PTransform URNs since they are different paradigms 
>>>>> >>>> >> > than "vanilla" ParDo.
>>>>> >>>> >>
>>>>> >>>> >> Here I disagree. What about one that is both splittable and 
>>>>> >>>> >> stateful? Would one have a fourth URN for that? If/when another 
>>>>> >>>> >> flavor of DoFn comes out, would we then want 8 distinct URNs? 
>>>>> >>>> >> (SplitableParDo in particular can be executed as a normal ParDo 
>>>>> >>>> >> as long as the output is bounded.)
>>>>> >>>> >
>>>>> >>>> > I agree that you could have stateful and splittable dofns where 
>>>>> >>>> > the element is the key and you share state and timers across 
>>>>> >>>> > restrictions. No runner is capable of executing this efficiently.
>>>>> >>>> >
>>>>> >>>> >> >> > On the SDK requirements side: the constructing SDK owns the 
>>>>> >>>> >> >> > Environment proto completely, so it is in a position to 
>>>>> >>>> >> >> > ensure the involved docker images support the necessary 
>>>>> >>>> >> >> > features.
>>>>> >>>> >> >>
>>>>> >>>> >> >> Yes.
>>>>> >>>> >
>>>>> >>>> >
>>>>> >>>> > I believe capabilities do exist on a Pipeline and it informs 
>>>>> >>>> > runners about new types of fields to be aware of either within 
>>>>> >>>> > Components or on the Pipeline object itself but for this 
>>>>> >>>> > discussion it makes sense that an environment would store most 
>>>>> >>>> > "capabilities" related to execution.
>>>>> >>>> >
>>>>> >>>> >> [snip]
>>>>> >>>> >
>>>>> >>>> > As for the proto clean-ups, the scope is to cover almost all 
>>>>> >>>> > things needed for execution now and to follow-up with optional 
>>>>> >>>> > transforms, payloads, and coders later which would exclude job 
>>>>> >>>> > managment APIs and artifact staging. A formal enumeration would be 
>>>>> >>>> > useful here. Also, we should provide formal guidance about adding 
>>>>> >>>> > new fields, adding new types of transforms, new types of proto 
>>>>> >>>> > messages, ... (best to describe this on a case by case basis as to 
>>>>> >>>> > how people are trying to modify the protos and evolve this 
>>>>> >>>> > guidance over time).
>>>>> >>>>
>>>>> >>>> What we need is the ability for (1) runners to reject future 
>>>>> >>>> pipelines
>>>>> >>>> they cannot faithfully execute and (2) runners to be able to take
>>>>> >>>> advantage of advanced features/protocols when interacting with those
>>>>> >>>> SDKs that understand them while avoiding them for older (or newer)
>>>>> >>>> SDKs that don't. Let's call (1) (hard) requirements and (2) 
>>>>> >>>> (optional)
>>>>> >>>> capabilities.
>>>>> >>>>
>>>>> >>>> Where possible, I think this is best expressed inherently in the set
>>>>> >>>> of transform (and possibly other component) URNs. For example, when 
>>>>> >>>> an
>>>>> >>>> SDK uses a combine_per_key composite, that's a signal that it
>>>>> >>>> understands the various related combine_* transforms. Similarly, a
>>>>> >>>> pipeline with a test_stream URN would be rejected by pipelines not
>>>>> >>>> recognizing/supporting this primitive. However, this is not always
>>>>> >>>> possible, e.g. for (1) we have the aforementioned boolean flags on
>>>>> >>>> ParDo and for (2) we have features like large iterable and progress
>>>>> >>>> support.
>>>>> >>>>
>>>>> >>>> For (1) we have to enumerate now everywhere a runner must look a far
>>>>> >>>> into the future as we want to remain backwards compatible. This is 
>>>>> >>>> why
>>>>> >>>> I suggested putting something on the pipeline itself, but we could
>>>>> >>>> (likely in addition) add it to Transform and/or ParDoPayload if we
>>>>> >>>> think that'd be useful now. (Note that a future pipeline-level
>>>>> >>>> requirement could be "inspect (previously non-existent) requirements
>>>>> >>>> field attached to objects of type X.")
>>>>> >>>>
>>>>> >>>> For (2) I think adding a capabilities field to the environment for 
>>>>> >>>> now
>>>>> >>>> makes the most sense, and as it's optional to inspect them adding it
>>>>> >>>> elsewhere if needed is backwards compatible. (The motivation to do it
>>>>> >>>> now is that there are some capabilities that we'd like to enumerate
>>>>> >>>> now rather than make part of the minimal set of things an SDK must
>>>>> >>>> support.)
>>>>> >>>>
>>>>> >>
>>>>> >> Agree on the separation of requirements from capabilities where 
>>>>> >> requirements is a set of MUST understand while capabilities are a set 
>>>>> >> of MAY understand.
>>>>> >>
>>>>> >>>>
>>>>> >>>> > All in all, I think "capabilities" is about informing a runner 
>>>>> >>>> > about what they should know about and what they are allowed to do. 
>>>>> >>>> > If we go with a list of "capabilities", we could always add a 
>>>>> >>>> > "parameterized capabilities" urn which would tell runners they 
>>>>> >>>> > need to also look at some other field.
>>>>> >>>>
>>>>> >>>> Good point. That lets us keep it as a list for now. (The risk is that
>>>>> >>>> it makes possible the bug of populating parameters without adding the
>>>>> >>>> required notification to the list.)
>>>>> >>>>
>>>>> >>>> > I also believe capabilities should NOT be "inherited". For example 
>>>>> >>>> > if we define capabilities on a ParDoPayload, and on a PTransform 
>>>>> >>>> > and on Environment, then ParDoPayload capabilities shouldn't be 
>>>>> >>>> > copied to PTransform and PTransform specific capabilities 
>>>>> >>>> > shouldn't be copied to the Environment. My reasoning about this is 
>>>>> >>>> > that some "capabilities" can only be scoped to a single 
>>>>> >>>> > ParDoPayload or a single PTransform and wouldn't apply generally 
>>>>> >>>> > everywhere. The best example I could think of is that Environment 
>>>>> >>>> > A supports progress reporting while Environment B doesn't so it 
>>>>> >>>> > wouldn't have made sense to say the "Pipeline" supports progress 
>>>>> >>>> > reporting.
>>>>> >>>> >
>>>>> >>>> > Are capabilities strictly different from "resources" (transform 
>>>>> >>>> > needs python package X) or "execution hints" (e.g. deploy on 
>>>>> >>>> > machines that have GPUs, some generic but mostly runner specific 
>>>>> >>>> > hints)? At first glance I would say yes.
>>>>> >>>>
>>>>> >>>> Agreed.

Reply via email to