Does this handle the full DoFn spec (e.g. bundle start/finish, WIndowFn >> params, state and timers, etc.)? > > It handles the DoFn lifecycle methods (setup/start/finish/teardown), with > the exception that it doesn't currently handle generating elements in > `finish_bundle`. > > Yeah, that seems intrinsically problematic. (I suppose it could also be > dangerous for any DoFn that buffers elements, but to do that one would need > to, in the general case, emit from finish bundle.)
Ah right... The association between input row and output rows doesn't really make sense with elements emitted from finish_bundle. I'm not sure how this should be resolved. I suppose we could just throw an error if there's anything ever yielded from finish_bundle, but we wouldn't know this until runtime. I'm sure there's also quite a number of dofn's that implement buffers. On Mon, May 12, 2025 at 11:30 AM Robert Bradshaw <rober...@waymo.com> wrote: > On Mon, May 12, 2025 at 8:25 AM Joey Tran <joey.t...@schrodinger.com> > wrote: > >> and preserve rather than elide the input field (at least as an option). >>> >> >> Oh yes, that is actually the currently implemented behavior, I just >> forgot to type the input field in my example (probably shouldn't have tried >> to do it on mobile) >> > > +1 > > >> Does this handle the full DoFn spec (e.g. bundle start/finish, WIndowFn >>> params, state and timers, etc.)? >> >> It handles the DoFn lifecycle methods (setup/start/finish/teardown), with >> the exception that it doesn't currently handle generating elements in >> `finish_bundle`. >> > > Yeah, that seems intrinsically problematic. (I suppose it could also be > dangerous for any DoFn that buffers elements, but to do that one would need > to, in the general case, emit from finish bundle.) > > >> It also doesn't currently handle any params, side inputs, state, timers, >> or batching (though the implementation doesn't preclude implementing those >> later afaict) >> > > Does it detect/nicely fail in such cases? For other reasons, I've been > thinking it would be nice to figure out a general framework to "forward" > such arguments. > > It does handle tagged pcollections and validating the input pcollection >> (does the input pcollection define the input field and is the input field >> type consistent with the DoFn.process type hint?) and creating the correct >> output type hint for the output pcollection (same as the input schema but >> with a new or overwritten output field as the first field with field type >> inferred from DoFn.process) >> > > Nice. > > This certainly seems a worthwhile addition. > > >> On Mon, May 12, 2025 at 11:14 AM Robert Bradshaw <rober...@waymo.com> >> wrote: >> >>> On Sun, May 11, 2025 at 7:35 PM Reuven Lax via dev <dev@beam.apache.org> >>> wrote: >>> >>>> My first thought is that this should go in contrib for now. >>>> >>>> BTW in the Java SDK, field access is integrated directly into ParDo. >>>> e.g. you can write >>>> >>>> new DoFn<> { >>>> @ProcessElement >>>> public void process(@FieldAccess("field1") Type1 >>>> field1, @FieldAccess("field2") Type2 field2) { >>>> ... >>>> } >>>> } >>>> >>>> It also supports selecting wildcards (e.g. @FieldAccess("top.*")). >>>> >>> >>> BTW, how is this different than @FieldAccess("top")? >>> >>> >>>> I'm not sure how this pattern would translate into the Python SDK >>>> though. >>>> >>> >>> It would probably look like >>> >>> def process(field1=FieldAccess("field1"), ...) >>> >>> though in Python there's much less need as Row objects are not as >>> cumbersome to use, e.g. >>> >>> def process(row): >>> # access row.field1 directly here >>> >>> though it could be useful for optimizations like projection lifting. >>> >>> >>> As for the original question, the value here is being able to adapt a >>> DoFn<T, O> to apply to a single field of a Row? This certainly seems to >>> have value. I might suggest a syntax like >>> >>> schema_pcoll | DoToField(SomeDoFn(), input_field="element", >>> output_field="word") >>> >>> and preserve rather than elide the input field (at least as an option). >>> >>> Does this handle the full DoFn spec (e.g. bundle start/finish, WIndowFn >>> params, state and timers, etc.)? >>> >>> On Sat, May 10, 2025 at 3:35 AM Joey Tran <joey.t...@schrodinger.com> >>>> wrote: >>>> >>>>> Not currently >>>>> >>>>> On Sat, May 10, 2025, 12:48 AM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> Does this work with nested fields? Can you specify >>>>>> Input_field="a.b.c"? >>>>>> >>>>>> On Fri, May 9, 2025 at 7:18 PM Joey Tran <joey.t...@schrodinger.com> >>>>>> wrote: >>>>>> >>>>>>> Sure! >>>>>>> >>>>>>> Given a DoFn that has... >>>>>>> >>>>>>> def process(self, sentence): >>>>>>> yield from sentence.split() >>>>>>> >>>>>>> >>>>>>> You could use it with SchemadParDo as: >>>>>>> >>>>>>> (p | beam.Create([pvalue.Row(element="hello world", id="id")]) >>>>>>> | SchemadParDo(SchemadParDo(SplitSentenceDoFn(), >>>>>>> input_field="element", output_field="word")) >>>>>>> >>>>>>> And it'd produce Row(word="hello", id="id") and Row(word=""world", >>>>>>> id="id") >>>>>>> >>>>>>> On Fri, May 9, 2025, 9:57 PM Reuven Lax via dev <dev@beam.apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Can you explain a bit how SchemadParDo works? >>>>>>>> >>>>>>>> On Fri, May 9, 2025 at 4:49 PM Joey Tran <joey.t...@schrodinger.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I've written a `SchemadParDo(input_field: str, output_field, >>>>>>>>> dofn:DoFn)` transform for more easily writing a Schemad transform >>>>>>>>> given a >>>>>>>>> DoFn. >>>>>>>>> >>>>>>>>> Is this something worth upstreaming into the Beam Python SDK? I >>>>>>>>> wrote it to make it easier to convert our current set of dofn's into >>>>>>>>> schemad dofns for use with the YAML SDK. Just wanted to gauge interest >>>>>>>>> before setting up the dev env again >>>>>>>>> >>>>>>>>