Good question. My last sentence was not clear. We do not need to
automatically propagate the capabilities offered by runners-core to a
particular runner. The runner can (and should) own the claim of what its
capabilities are.

Kenn

On Thu, Feb 20, 2020 at 10:05 PM Luke Cwik <lc...@google.com> wrote:

> Which part of the proposal do you think is solving a problem we may not
> have?
>
> On Thu, Feb 20, 2020 at 8:19 PM Kenneth Knowles <k...@apache.org> wrote:
>
>> I would rather say that "runners-core" is a utility library with some
>> helpful things. Like other libraries. The runner still decides how to use
>> the library. That was the idea, anyhow. A runner could have a bunch of "if"
>> statements around how it uses some generic runners-core utility, etc. I
>> think at this point the proposal is trying to solve a problem we may not
>> have.
>>
>> Kenn
>>
>> On Thu, Feb 20, 2020 at 1:25 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>>
>>> On 2/20/20 8:24 PM, Robert Bradshaw wrote:
>>>
>>> On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský <je...@seznam.cz> 
>>> <je...@seznam.cz> wrote:
>>>
>>> Hi,
>>>
>>> +1 for adding pipeline required features. I think being able to reject 
>>> pipeline with unknown requirement is pretty much needed, mostly because 
>>> that enables runners to completely decouple from SDKs, while being able to 
>>> recognize when a pipeline constructed with incomplatible version of SDK is 
>>> run.
>>>
>>> I'll add some observations I made when implementing the latest "requires 
>>> time sorted input" addition with regards to this discussion:
>>>
>>>  a) the features of pipeline are not simple function of set of PTransforms 
>>> being present in the pipeline, but also depend on (type of) inputs. For 
>>> instance a PTransform might have a simple expansion to primitive 
>>> PTransforms in streaming case, but don't have such expansion in batch case. 
>>> That is to say, runner that doesn't actually know of a specific extension 
>>> to some PTransform _might_ actually execute it correctly under some 
>>> conditions. But _must_ fail in other cases.
>>>
>>> It sounds like what you're getting at here is a Statful ParDo that
>>> requires "mostly" time sorted input (to keep the amount of state held
>>> bounded) which is somewhat provided (with no bounds given) for
>>> unbounded PCollections but not at all (in general) for batch. Rather
>>> than phrase this as a conditional requirement, I would make a new
>>> requirement "requires mostly time sorted input" (precise definition
>>> TBD, it's hard to specify or guarantee upper bounds) which a runner
>>> could then implement via exact time sorted input in batch and but more
>>> cheaply as a no-op in streaming.
>>>
>>> +1, that makes sense. My example was a little incomplete, in the sense
>>> that, for @RequiresTimeSortedInput does not have any requirements on runner
>>> in streaming case, with one exception - the runner must be compiled with
>>> the newest runners-core. That brings us to the fact, that runners
>>> capabilities are actually not just function of the runner's code, but also
>>> code that is imported from runners-core. There probably should be a way for
>>> the core to export its capabilities (e.g. provides:
>>> beam:requirement:pardo:time_sorted_input:streaming:v1), which should
>>> then be united with capabilities of the runner itself. That way a runner
>>> which uses runners-core (and StatefulDoFnRunner, that is a complication,
>>> not sure how to deal with that), could be made able to satify 
>>> 'beam:requirement:pardo:time_sorted_input:streaming:v1'
>>> simply by recompiling the runner with newest core.
>>>
>>>  b) it would be good if this feature would work independently of 
>>> portability (for Java SDK). We still have (at least two) non-portable 
>>> runners that are IMO widely used in production and are likely to last for 
>>> some time.
>>>
>>> Yes. As mentioned, we can still convert to portability to do such
>>> analysis even if we don't use it for execution.
>>>
>>>
>>>  c) we can take advantage of these pipeline features to get rid of the 
>>> categories of @ValidatesRunner tests, because we could have just simply 
>>> @ValidatesRunner and each test would be matched against runner capabilities 
>>> (i.e. a runner would be tested with given test if and only if it would not 
>>> reject it)
>>>
>>> +1
>>>
>>>
>>> Jan
>>>
>>> On 2/13/20 8:42 PM, Robert Burke wrote:
>>>
>>> +1 to deferring for now. Since they should not be modified after adoption, 
>>> it makes sense not to get ahead of ourselves.
>>>
>>> On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <rober...@google.com> 
>>> <rober...@google.com> wrote:
>>>
>>> On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <rob...@frantil.com> 
>>> <rob...@frantil.com> wrote:
>>>
>>> One thing that doesn't appear to have been suggested yet is we could 
>>> "batch" urns together under a "super urn" so that adding one super urn is 
>>> like adding each of the represented batch of features. This prevents 
>>> needing to send dozens of urns to be individually sent over.
>>>
>>>
>>> The super urns would need to be static after definition to avoid mismatched 
>>> definitions down the road.
>>>
>>> We collect together urns what is reasonably consider "vX" support, and can 
>>> then increment that later.
>>>
>>> This would simplify new SDKs, as they can have a goal of initial v1 support 
>>> as we define what level of feature support it has, and doesn't prevent new 
>>> capabilities from being added incrementally.
>>>
>>> Yes, this is a very good idea. I've also been thinking of certain sets
>>> of common operations/well known DoFns that often occur on opposite
>>> sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
>>> are commonly supported that could be grouped under these meta-urns.
>>>
>>> Note that these need not be monotonic, for example a current v1 might
>>> be requiring LengthPrefixCoderV1, but if a more efficient
>>> LengthPrefixCoderV2 comes along eventually v2 could require that and
>>> *not* require the old, now rarely used LengthPrefixCoderV1.
>>>
>>> Probably makes sense to defer adding such super-urns until we notice a
>>> set that is commonly used together in practice.
>>>
>>> Of course there's still value in SDKs being able to support features
>>> piecemeal as well, which is the big reason we're avoiding a simple
>>> monotonically-increasing version number.
>>>
>>>
>>> Similarly, certain features sets could stand alone, eg around SQL. It's 
>>> benefitial for optimization reasons if an SDK has native projection and UDF 
>>> support for example, which a runner could take advantage of by avoiding 
>>> extra cross language hops. These could then also be grouped under a SQL 
>>> super urn.
>>>
>>> This is from the SDK capability side of course, rather than the SDK 
>>> pipeline requirements side.
>>>
>>> -------
>>> Related to that last point, it might be good to nail down early the 
>>> perspective used when discussing these things, as there's a dual between 
>>> "what and SDK can do", and "what the runner will do to a pipeline that the 
>>> SDK can understand" (eg. Combiner lifting, and state backed iterables), as 
>>> well as "what the pipeline requires from the runner" and "what the runner 
>>> is able to do" (eg. Requires sorted input)
>>>
>>>
>>> On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lc...@google.com> 
>>> <lc...@google.com> wrote:
>>>
>>> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <k...@apache.org> 
>>> <k...@apache.org> wrote:
>>>
>>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <rober...@google.com> 
>>> <rober...@google.com> wrote:
>>>
>>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com> 
>>> <lc...@google.com> wrote:
>>>
>>> We can always detect on the runner/SDK side whether there is an unknown 
>>> field[1] within a payload and fail to process it but this is painful in two 
>>> situations:
>>> 1) It doesn't provide for a good error message since you can't say what the 
>>> purpose of the field is. With a capability URN, the runner/SDK could say 
>>> which URN it doesn't understand.
>>> 2) It doesn't allow for the addition of fields which don't impact semantics 
>>> of execution. For example, if the display data feature was being developed, 
>>> a runner could ignore it and still execute the pipeline correctly.
>>>
>>> Yeah, I don't think proto reflection is a flexible enough tool to do
>>> this well either.
>>>
>>>
>>> If we think this to be common enough, we can add capabilities list to the 
>>> PTransform so each PTransform can do this and has a natural way of being 
>>> extended for additions which are forwards compatible. The alternative to 
>>> having capabilities on PTransform (and other constructs) is that we would 
>>> have a new URN when the specification of the transform changes. For 
>>> forwards compatible changes, each SDK/runner would map older versions of 
>>> the URN onto the latest and internally treat it as the latest version but 
>>> always downgrade it to the version the other party expects when 
>>> communicating with it. Backwards incompatible changes would always require 
>>> a new URN which capabilities at the PTransform level would not help with.
>>>
>>> As you point out, stateful+splittable may not be a particularly useful
>>> combination, but as another example, we have
>>> (backwards-incompatible-when-introduced) markers on DoFn as to whether
>>> it requires finalization, stable inputs, and now time sorting. I don't
>>> think we should have a new URN for each combination.
>>>
>>> Agree with this. I don't think stateful, splittable, and "plain" ParDo are 
>>> comparable to these. Each is an entirely different computational paradigm: 
>>> per-element independent processing, per-key-and-window linear processing, 
>>> and per-element-and-restriction splittable processing. Most relevant IMO is 
>>> the nature of the parallelism. If you added state to splittable processing, 
>>> it would still be splittable processing. Just as Combine and ParDo can 
>>> share the SideInput specification, it is easy to share relevant 
>>> sub-structures like state declarations. But it is a fair point that the 
>>> ability to split can be ignored and run as a plain-old ParDo. It brings up 
>>> the question of whether a runner that doesn't know SDF is should have to 
>>> reject it or should be allowed to run poorly.
>>>
>>> Being splittable means that the SDK could choose to return a continuation 
>>> saying please process the rest of my element in X amount of time which 
>>> would require the runner to inspect certain fields on responses. One 
>>> example would be I don't have many more messages to read from this message 
>>> stream at the moment and another example could be that I detected that this 
>>> filesystem is throttling me or is down and I would like to resume 
>>> processing later.
>>>
>>>
>>> It isn't a huge deal. Three different top-level URNS versus three different 
>>> sub-URNs will achieve the same result in the end if we get this 
>>> "capability" thing in place.
>>>
>>> Kenn
>>>
>>>
>>> I do think that splittable ParDo and stateful ParDo should have separate 
>>> PTransform URNs since they are different paradigms than "vanilla" ParDo.
>>>
>>> Here I disagree. What about one that is both splittable and stateful? Would 
>>> one have a fourth URN for that? If/when another flavor of DoFn comes out, 
>>> would we then want 8 distinct URNs? (SplitableParDo in particular can be 
>>> executed as a normal ParDo as long as the output is bounded.)
>>>
>>> I agree that you could have stateful and splittable dofns where the element 
>>> is the key and you share state and timers across restrictions. No runner is 
>>> capable of executing this efficiently.
>>>
>>>
>>> On the SDK requirements side: the constructing SDK owns the Environment 
>>> proto completely, so it is in a position to ensure the involved docker 
>>> images support the necessary features.
>>>
>>> Yes.
>>>
>>> I believe capabilities do exist on a Pipeline and it informs runners about 
>>> new types of fields to be aware of either within Components or on the 
>>> Pipeline object itself but for this discussion it makes sense that an 
>>> environment would store most "capabilities" related to execution.
>>>
>>>
>>> [snip]
>>>
>>> As for the proto clean-ups, the scope is to cover almost all things needed 
>>> for execution now and to follow-up with optional transforms, payloads, and 
>>> coders later which would exclude job managment APIs and artifact staging. A 
>>> formal enumeration would be useful here. Also, we should provide formal 
>>> guidance about adding new fields, adding new types of transforms, new types 
>>> of proto messages, ... (best to describe this on a case by case basis as to 
>>> how people are trying to modify the protos and evolve this guidance over 
>>> time).
>>>
>>> What we need is the ability for (1) runners to reject future pipelines
>>> they cannot faithfully execute and (2) runners to be able to take
>>> advantage of advanced features/protocols when interacting with those
>>> SDKs that understand them while avoiding them for older (or newer)
>>> SDKs that don't. Let's call (1) (hard) requirements and (2) (optional)
>>> capabilities.
>>>
>>> Where possible, I think this is best expressed inherently in the set
>>> of transform (and possibly other component) URNs. For example, when an
>>> SDK uses a combine_per_key composite, that's a signal that it
>>> understands the various related combine_* transforms. Similarly, a
>>> pipeline with a test_stream URN would be rejected by pipelines not
>>> recognizing/supporting this primitive. However, this is not always
>>> possible, e.g. for (1) we have the aforementioned boolean flags on
>>> ParDo and for (2) we have features like large iterable and progress
>>> support.
>>>
>>> For (1) we have to enumerate now everywhere a runner must look a far
>>> into the future as we want to remain backwards compatible. This is why
>>> I suggested putting something on the pipeline itself, but we could
>>> (likely in addition) add it to Transform and/or ParDoPayload if we
>>> think that'd be useful now. (Note that a future pipeline-level
>>> requirement could be "inspect (previously non-existent) requirements
>>> field attached to objects of type X.")
>>>
>>> For (2) I think adding a capabilities field to the environment for now
>>> makes the most sense, and as it's optional to inspect them adding it
>>> elsewhere if needed is backwards compatible. (The motivation to do it
>>> now is that there are some capabilities that we'd like to enumerate
>>> now rather than make part of the minimal set of things an SDK must
>>> support.)
>>>
>>>
>>> Agree on the separation of requirements from capabilities where 
>>> requirements is a set of MUST understand while capabilities are a set of 
>>> MAY understand.
>>>
>>>
>>> All in all, I think "capabilities" is about informing a runner about what 
>>> they should know about and what they are allowed to do. If we go with a 
>>> list of "capabilities", we could always add a "parameterized capabilities" 
>>> urn which would tell runners they need to also look at some other field.
>>>
>>> Good point. That lets us keep it as a list for now. (The risk is that
>>> it makes possible the bug of populating parameters without adding the
>>> required notification to the list.)
>>>
>>>
>>> I also believe capabilities should NOT be "inherited". For example if we 
>>> define capabilities on a ParDoPayload, and on a PTransform and on 
>>> Environment, then ParDoPayload capabilities shouldn't be copied to 
>>> PTransform and PTransform specific capabilities shouldn't be copied to the 
>>> Environment. My reasoning about this is that some "capabilities" can only 
>>> be scoped to a single ParDoPayload or a single PTransform and wouldn't 
>>> apply generally everywhere. The best example I could think of is that 
>>> Environment A supports progress reporting while Environment B doesn't so it 
>>> wouldn't have made sense to say the "Pipeline" supports progress reporting.
>>>
>>> Are capabilities strictly different from "resources" (transform needs 
>>> python package X) or "execution hints" (e.g. deploy on machines that have 
>>> GPUs, some generic but mostly runner specific hints)? At first glance I 
>>> would say yes.
>>>
>>> Agreed.
>>>
>>>

Reply via email to