On Wed, Feb 12, 2020 at 7:57 AM Robert Bradshaw <[email protected]> wrote:
> On Tue, Feb 11, 2020 at 7:25 PM Kenneth Knowles <[email protected]> wrote: > > > > On Tue, Feb 11, 2020 at 8:38 AM Robert Bradshaw <[email protected]> > wrote: > >> > >> On Mon, Feb 10, 2020 at 7:35 PM Kenneth Knowles <[email protected]> > wrote: > >> > > >> > On the runner requirements side: if you have such a list at the > pipeline level, it is an opportunity for the list to be inconsistent with > the contents of the pipeline. For example, if a DoFn is marked "requires > stable input" but not listed at the pipeline level, then the runner may run > it without ensuring it requires stable input. > >> > >> Yes. Listing this feature at the top level, if used, would be part of > >> the contract. The problem here that we're trying to solve is that the > >> runner wouldn't know about the field used to mark a DoFn as "requires > >> stable input." Another alternative would be to make this kind of ParDo > >> a different URN, but that would result in a cross product of URNs for > >> all supported features. > > > > > >> > >> Rather than attaching it to the pipeline object, we could attach it to > >> the transform. (But if there are ever extensions that don't belong to > >> transforms, we'd be out of luck. It'd be even worse to attach it to > >> the ParDoPayload, as then we'd need one on CombinePayload, etc. just > >> in case.) This is why I was leaning towards just putting it at the > >> top. > >> > >> I agree about the potential for incompatibility. As much as possible > >> I'd rather extend things in a way that would be intrinsically rejected > >> by a non-comprehending runner. But I'm not sure how to do that when > >> introducing new constraints for existing components like this. But I'm > >> open to other suggestions. > > > > > > I was waiting for Luke to mention something he suggested offline: that > we make this set of fields a list of URNs and require a runner to fail if > there are any that it does not understand. That should do it for > DoFn-granularity features. It makes sense - proto is designed to > ignore/propagate unknown bits. We want to fail on unknown bits. > > I agree this would be superior for bools like requires_time_sorted_input > and requests_finalization. Would it be worth making this a map for those > features that have attached data such that it could not be forgotten? (E.g. > rather than state_specs being a top-level field, it would be a value for > the requires-state URN.) Should we move to this pattern for existing > requirements (like the aforementioned state) or just future ones? Was the > parameters field an attempt in this direction? > > I still think we need something top-level lest we not be able to modify > anything but ParDo, but putting it on ParDo as well could be natural. > We can always detect on the runner/SDK side whether there is an unknown field[1] within a payload and fail to process it but this is painful in two situations: 1) It doesn't provide for a good error message since you can't say what the purpose of the field is. With a capability URN, the runner/SDK could say which URN it doesn't understand. 2) It doesn't allow for the addition of fields which don't impact semantics of execution. For example, if the display data feature was being developed, a runner could ignore it and still execute the pipeline correctly. If we think this to be common enough, we can add capabilities list to the PTransform so each PTransform can do this and has a natural way of being extended for additions which are forwards compatible. The alternative to having capabilities on PTransform (and other constructs) is that we would have a new URN when the specification of the transform changes. For forwards compatible changes, each SDK/runner would map older versions of the URN onto the latest and internally treat it as the latest version but always downgrade it to the version the other party expects when communicating with it. Backwards incompatible changes would always require a new URN which capabilities at the PTransform level would not help with. > I do think that splittable ParDo and stateful ParDo should have separate > PTransform URNs since they are different paradigms than "vanilla" ParDo. > > Here I disagree. What about one that is both splittable and stateful? > Would one have a fourth URN for that? If/when another flavor of DoFn comes > out, would we then want 8 distinct URNs? (SplitableParDo in particular can > be executed as a normal ParDo as long as the output is bounded.) > I agree that you could have stateful and splittable dofns where the element is the key and you share state and timers across restrictions. No runner is capable of executing this efficiently. > >> > On the SDK requirements side: the constructing SDK owns the > Environment proto completely, so it is in a position to ensure the involved > docker images support the necessary features. > >> > >> Yes. > I believe capabilities do exist on a Pipeline and it informs runners about new types of fields to be aware of either within Components or on the Pipeline object itself but for this discussion it makes sense that an environment would store most "capabilities" related to execution. > >> > >> > Is it sufficient for each SDK involved in a cross-language expansion > to validate that it understands the inputs? For example if Python sends a > PCollection with a pickle coder to Java as input to an expansion then it > will fail. And conversely if the returned subgraph outputs a PCollection > with a Java custom coder. > >> > >> Yes. It's possible to imagine there could be some negotiation about > >> inserting length prefix coders (e.g. a Count transform could act on > >> any opaque data as long as it can delimit it), but that's still TBD. > I would suggest in situations where SDK A knows that Count could be done opaquely, then it defines Count using a well known type such as "bytes" and have any other SDK that uses it to perform the transcoding. Not all languages have the flexibility to pass around an opaque type. > >> > >> > More complex use cases that I can imagine all seem futuristic and > unlikely to come to pass (Python passes a pickled DoFn to the Java > expansion service which inserts it into the graph in a way where a > Java-based transform would have to invoke it on every element, etc) > >> > >> Some transforms are configured with UDFs of this form...but we'll > >> cross that bridge when we get to it. > > > > > > Now that I think harder, I know of a TimestampFn that governs the > watermark. Does SDF solve this by allowing a composite IO where the parsing > to be done in one language while the watermark is somehow governed by the > other? And then there's writing a SQL UDF in your language of choice... > Anyhow, probably a tangent... > > Yeah, it'd be good to support this, someday... > I think this will be a higher level transform and not something as low level as a ParDo. Such as a Kafka transform that is parameterized by a parsing function and watermark function but still very much up for discussion. > > >> > On Mon, Feb 10, 2020 at 5:03 PM Brian Hulette <[email protected]> > wrote: > >> >> > >> >> I like the capabilities/requirements idea. Would these capabilities > be at a level that it would make sense to document in the capabilities > matrix? i.e. could the URNs be the values of "X" Pablo described here [1]. > >> >> > Yes and no. Yes is that we would be able to formally enumerate this, and list what each SDK and each runner supports (at specific versions). No since being able to do something isn't the same as being able to do something well so we would still want to have some way to say that a Runner can do something well. > >> >> Brian > >> >> > >> >> [1] > https://lists.apache.org/thread.html/e93ac64d484551d61e559e1ba0cf4a15b760e69d74c5b1d0549ff74f%40%3Cdev.beam.apache.org%3E > >> >> > >> >> On Mon, Feb 10, 2020 at 3:55 PM Robert Bradshaw <[email protected]> > wrote: > >> >>> > >> >>> With an eye towards cross-language (which includes cross-version) > >> >>> pipelines and services (specifically looking at Dataflow) supporting > >> >>> portable pipelines, there's been a desire to stabilize the > portability > >> >>> protos. There are currently many cleanups we'd like to do [1] (some > >> >>> essential, others nice to have); are there others that people would > >> >>> like to see? > >> >>> > >> >>> Of course we would like it to be possible for the FnAPI and Beam > >> >>> itself to continue to evolve. Most of this can be handled by runners > >> >>> understanding various transform URNs, but not all. (An example that > >> >>> comes to mind is support for large iterables [2], or the requirement > >> >>> to observe and respect new fields on a PTransform or its payloads > >> >>> [3]). One proposal for this is to add capabilities and/or > >> >>> requirements. An environment (corresponding generally to an SDK) > could > >> >>> adveritize various capabilities (as a list or map of URNs) which a > >> >>> runner can take advantage of without requiring all SDKs to support > all > >> >>> features at the same time. For the other way around, we need a way > of > >> >>> marking something that a runner must reject if it does not > understand > >> >>> it. This could be a set of requirements (again, a list of map of > URNs) > >> >>> that designate capabilities required to at least be understood by > the > >> >>> runner to faithfully execute this pipeline. (These could be attached > >> >>> to a transform or the pipeline itself.) Do these sound like > reasonable > >> >>> additions? Also, would they ever need to be parameterized (map), or > >> >>> would a list suffice? > >> >>> > >> >>> [1] BEAM-2645, BEAM-2822, BEAM-3203, BEAM-3221, BEAM-3223, > BEAM-3227, > >> >>> BEAM-3576, BEAM-3577, BEAM-3595, BEAM-4150, BEAM-4180, BEAM-4374, > >> >>> BEAM-5391, BEAM-5649, BEAM-8172, BEAM-8201, BEAM-8271, BEAM-8373, > >> >>> BEAM-8539, BEAM-8804, BEAM-9229, BEAM-9262, BEAM-9266, and BEAM-9272 > >> >>> [2] > https://lists.apache.org/thread.html/70cac361b659516933c505b513d43986c25c13da59eabfd28457f1f2@%3Cdev.beam.apache.org%3E > >> >>> [3] > https://lists.apache.org/thread.html/rdc57f240069c0807eae87ed2ff13d3ee503bc18e5f906d05624e6433%40%3Cdev.beam.apache.org%3E > As for the proto clean-ups, the scope is to cover almost all things needed for execution now and to follow-up with optional transforms, payloads, and coders later which would exclude job managment APIs and artifact staging. A formal enumeration would be useful here. Also, we should provide formal guidance about adding new fields, adding new types of transforms, new types of proto messages, ... (best to describe this on a case by case basis as to how people are trying to modify the protos and evolve this guidance over time). All in all, I think "capabilities" is about informing a runner about what they should know about and what they are allowed to do. If we go with a list of "capabilities", we could always add a "parameterized capabilities" urn which would tell runners they need to also look at some other field. I also believe capabilities should NOT be "inherited". For example if we define capabilities on a ParDoPayload, and on a PTransform and on Environment, then ParDoPayload capabilities shouldn't be copied to PTransform and PTransform specific capabilities shouldn't be copied to the Environment. My reasoning about this is that some "capabilities" can only be scoped to a single ParDoPayload or a single PTransform and wouldn't apply generally everywhere. The best example I could think of is that Environment A supports progress reporting while Environment B doesn't so it wouldn't have made sense to say the "Pipeline" supports progress reporting. Are capabilities strictly different from "resources" (transform needs python package X) or "execution hints" (e.g. deploy on machines that have GPUs, some generic but mostly runner specific hints)? At first glance I would say yes. 1: https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/MessageOrBuilder.html#getUnknownFields--
