On Mon, May 12, 2025 at 12:08 PM Joey Tran <joey.t...@schrodinger.com> wrote:
> Does this handle the full DoFn spec (e.g. bundle start/finish, WIndowFn >>> params, state and timers, etc.)? >> >> It handles the DoFn lifecycle methods (setup/start/finish/teardown), with >> the exception that it doesn't currently handle generating elements in >> `finish_bundle`. >> >> Yeah, that seems intrinsically problematic. (I suppose it could also be >> dangerous for any DoFn that buffers elements, but to do that one would need >> to, in the general case, emit from finish bundle.) > > > Ah right... The association between input row and output rows doesn't > really make sense with elements emitted from finish_bundle. > > I'm not sure how this should be resolved. I suppose we could just throw an > error if there's anything ever yielded from finish_bundle, but we wouldn't > know this until runtime. I'm sure there's also quite a number of dofn's > that implement buffers. > Yeah, I think throwing an error on emits from finish_bundle (plus a lot of docs) is probably reasonable. It probably wouldn't be too hard (in Python at least, and maybe Java) to detect this at construction time based on the bytecode as a follow-up. On Tue, May 13, 2025 at 7:22 AM Joey Tran <joey.t...@schrodinger.com> wrote: > I'm curious if there's been any exploration of ways to generally convert a > library of non-schemad transforms (in python/java/etc) to a library of > schemad forms (e.g. to expose a YAML version of a library). > > This SchemadParDo is one way that I've been trying to do that, but isn't > exactly full proof as we see here. It also requires carrying around > possibly a lot of extra fields that aren't necessary for the underlying > DoFn. I've also considered using putting row-identifying information into > an element's window and then unwindowing and reschemaing it afterwards. > That also doesn't feel like the greatest solution. > > Are there any other ideas floating around? > There are a couple of very interesting ideas to unpack here. First, giving transforms a schema'd API to untether them from the language in which they're defined (including exposing them to YAML). This involves three parts: declaring (and perhaps adding a conversion function from) an input schema, declaring (and perhaps adding a conversion function to) the output schema, and declaring (and perhaps adding a mapping function) from a config schema to an instantiated transform. For Java often we've been doing this via lightweight wrappers as part of exposing them as external transforms (and also giving them an associated URN). For Python the construction part can be easy (if the construction parameter are already well defined types that can be passed via kwargs, and its identifier can be the fully qualified name, and we've sometimes augmented transforms to be able to accept schema'd data and/or an option to produce schema'd data (though perhaps we could be more consistent here). I'm all for making this process even more lightweight, e.g. via decorator or better utility functions, at least as a transition until the "default" version of all of these is simply the schema'd variant. The second idea here is the ability to apply a DoFn, or general mapping, to one (or possibly a small number) of fields and "pass the rest" along. One option is like you have here, allowing application of an arbitrary DoFn to a given field. (As an aside, DoFns can be rather heavyweight; it'd be nice to accept an arbitrary callable, which is the motivation for DoFnParams like https://github.com/apache/beam/blob/master/sdks/python/apache_beam/pipeline_test.py#L848 if we can make these DoFnParams transitive). We could also make it easier to augment a Row with new field derived from their old fields (though this doesn't help adapting existing Fn). The utility you've proposed here would be a great addition. The third idea I see (though highly related) is the ability to pass arbitrary metadata through a PCollection transparently, just as Windows are passed. This is actually something that I've talked to people working on Flume with as well. One complication is what to do with aggregations. (Is there some way of automatically aggregating them if a GBK, Combine, or Join is encountered? What about side inputs?) Some metadata can be ignored, but other kinds (e.g. lineage or privacy information) should not be dropped lest the "extractor" who tries ot read this downstream interprets the lack of metadata incorrectly. Here one also has the buffering issues (though if augmented emitters to somehow be associated with their input element, or followed the pattern of using WindowedValue.withValue rather than creating them whole cloth, that could possibly be mitigated). I don't think we can boil the ocean and solve all of these at once, but it's definitely a direction I'd like to see us move towards. (FWIW, one of the efforts I'd like to see with Beam 3.0 is a move to a "schema'd first" world. Good data interoperability is key to finishing out the portability/cross language story, and frankly I think it makes for a better experience in-language as well much of the time. This will involve both ensuring we have schema'd versions of key transforms (including IO) and a revamp of our documentation.) > On Mon, May 12, 2025 at 3:08 PM Joey Tran <joey.t...@schrodinger.com> > wrote: > >> Does this handle the full DoFn spec (e.g. bundle start/finish, WIndowFn >>>> params, state and timers, etc.)? >>> >>> It handles the DoFn lifecycle methods (setup/start/finish/teardown), >>> with the exception that it doesn't currently handle generating elements in >>> `finish_bundle`. >>> >>> Yeah, that seems intrinsically problematic. (I suppose it could also be >>> dangerous for any DoFn that buffers elements, but to do that one would need >>> to, in the general case, emit from finish bundle.) >> >> >> Ah right... The association between input row and output rows doesn't >> really make sense with elements emitted from finish_bundle. >> >> I'm not sure how this should be resolved. I suppose we could just throw >> an error if there's anything ever yielded from finish_bundle, but we >> wouldn't know this until runtime. I'm sure there's also quite a number of >> dofn's that implement buffers. >> >> On Mon, May 12, 2025 at 11:30 AM Robert Bradshaw <rober...@waymo.com> >> wrote: >> >>> On Mon, May 12, 2025 at 8:25 AM Joey Tran <joey.t...@schrodinger.com> >>> wrote: >>> >>>> and preserve rather than elide the input field (at least as an option). >>>>> >>>> >>>> Oh yes, that is actually the currently implemented behavior, I just >>>> forgot to type the input field in my example (probably shouldn't have tried >>>> to do it on mobile) >>>> >>> >>> +1 >>> >>> >>>> Does this handle the full DoFn spec (e.g. bundle start/finish, >>>>> WIndowFn params, state and timers, etc.)? >>>> >>>> It handles the DoFn lifecycle methods (setup/start/finish/teardown), >>>> with the exception that it doesn't currently handle generating elements in >>>> `finish_bundle`. >>>> >>> >>> Yeah, that seems intrinsically problematic. (I suppose it could also be >>> dangerous for any DoFn that buffers elements, but to do that one would need >>> to, in the general case, emit from finish bundle.) >>> >>> >>>> It also doesn't currently handle any params, side inputs, state, >>>> timers, or batching (though the implementation doesn't preclude >>>> implementing those later afaict) >>>> >>> >>> Does it detect/nicely fail in such cases? For other reasons, I've been >>> thinking it would be nice to figure out a general framework to "forward" >>> such arguments. >>> >>> It does handle tagged pcollections and validating the input pcollection >>>> (does the input pcollection define the input field and is the input field >>>> type consistent with the DoFn.process type hint?) and creating the correct >>>> output type hint for the output pcollection (same as the input schema but >>>> with a new or overwritten output field as the first field with field type >>>> inferred from DoFn.process) >>>> >>> >>> Nice. >>> >>> This certainly seems a worthwhile addition. >>> >>> >>>> On Mon, May 12, 2025 at 11:14 AM Robert Bradshaw <rober...@waymo.com> >>>> wrote: >>>> >>>>> On Sun, May 11, 2025 at 7:35 PM Reuven Lax via dev < >>>>> dev@beam.apache.org> wrote: >>>>> >>>>>> My first thought is that this should go in contrib for now. >>>>>> >>>>>> BTW in the Java SDK, field access is integrated directly into ParDo. >>>>>> e.g. you can write >>>>>> >>>>>> new DoFn<> { >>>>>> @ProcessElement >>>>>> public void process(@FieldAccess("field1") Type1 >>>>>> field1, @FieldAccess("field2") Type2 field2) { >>>>>> ... >>>>>> } >>>>>> } >>>>>> >>>>>> It also supports selecting wildcards (e.g. @FieldAccess("top.*")). >>>>>> >>>>> >>>>> BTW, how is this different than @FieldAccess("top")? >>>>> >>>>> >>>>>> I'm not sure how this pattern would translate into the Python SDK >>>>>> though. >>>>>> >>>>> >>>>> It would probably look like >>>>> >>>>> def process(field1=FieldAccess("field1"), ...) >>>>> >>>>> though in Python there's much less need as Row objects are not as >>>>> cumbersome to use, e.g. >>>>> >>>>> def process(row): >>>>> # access row.field1 directly here >>>>> >>>>> though it could be useful for optimizations like projection lifting. >>>>> >>>>> >>>>> As for the original question, the value here is being able to adapt a >>>>> DoFn<T, O> to apply to a single field of a Row? This certainly seems to >>>>> have value. I might suggest a syntax like >>>>> >>>>> schema_pcoll | DoToField(SomeDoFn(), input_field="element", >>>>> output_field="word") >>>>> >>>>> and preserve rather than elide the input field (at least as an >>>>> option). >>>>> >>>>> Does this handle the full DoFn spec (e.g. bundle start/finish, >>>>> WIndowFn params, state and timers, etc.)? >>>>> >>>>> On Sat, May 10, 2025 at 3:35 AM Joey Tran <joey.t...@schrodinger.com> >>>>>> wrote: >>>>>> >>>>>>> Not currently >>>>>>> >>>>>>> On Sat, May 10, 2025, 12:48 AM Reuven Lax <re...@google.com> wrote: >>>>>>> >>>>>>>> Does this work with nested fields? Can you specify >>>>>>>> Input_field="a.b.c"? >>>>>>>> >>>>>>>> On Fri, May 9, 2025 at 7:18 PM Joey Tran <joey.t...@schrodinger.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Sure! >>>>>>>>> >>>>>>>>> Given a DoFn that has... >>>>>>>>> >>>>>>>>> def process(self, sentence): >>>>>>>>> yield from sentence.split() >>>>>>>>> >>>>>>>>> >>>>>>>>> You could use it with SchemadParDo as: >>>>>>>>> >>>>>>>>> (p | beam.Create([pvalue.Row(element="hello world", id="id")]) >>>>>>>>> | SchemadParDo(SchemadParDo(SplitSentenceDoFn(), >>>>>>>>> input_field="element", output_field="word")) >>>>>>>>> >>>>>>>>> And it'd produce Row(word="hello", id="id") and Row(word=""world", >>>>>>>>> id="id") >>>>>>>>> >>>>>>>>> On Fri, May 9, 2025, 9:57 PM Reuven Lax via dev < >>>>>>>>> dev@beam.apache.org> wrote: >>>>>>>>> >>>>>>>>>> Can you explain a bit how SchemadParDo works? >>>>>>>>>> >>>>>>>>>> On Fri, May 9, 2025 at 4:49 PM Joey Tran < >>>>>>>>>> joey.t...@schrodinger.com> wrote: >>>>>>>>>> >>>>>>>>>>> I've written a `SchemadParDo(input_field: str, output_field, >>>>>>>>>>> dofn:DoFn)` transform for more easily writing a Schemad transform >>>>>>>>>>> given a >>>>>>>>>>> DoFn. >>>>>>>>>>> >>>>>>>>>>> Is this something worth upstreaming into the Beam Python SDK? I >>>>>>>>>>> wrote it to make it easier to convert our current set of dofn's into >>>>>>>>>>> schemad dofns for use with the YAML SDK. Just wanted to gauge >>>>>>>>>>> interest >>>>>>>>>>> before setting up the dev env again >>>>>>>>>>> >>>>>>>>>>