On Mon, May 12, 2025 at 8:25 AM Joey Tran <joey.t...@schrodinger.com> wrote:
> and preserve rather than elide the input field (at least as an option). >> > > Oh yes, that is actually the currently implemented behavior, I just forgot > to type the input field in my example (probably shouldn't have tried to do > it on mobile) > +1 > Does this handle the full DoFn spec (e.g. bundle start/finish, WIndowFn >> params, state and timers, etc.)? > > It handles the DoFn lifecycle methods (setup/start/finish/teardown), with > the exception that it doesn't currently handle generating elements in > `finish_bundle`. > Yeah, that seems intrinsically problematic. (I suppose it could also be dangerous for any DoFn that buffers elements, but to do that one would need to, in the general case, emit from finish bundle.) > It also doesn't currently handle any params, side inputs, state, timers, > or batching (though the implementation doesn't preclude implementing those > later afaict) > Does it detect/nicely fail in such cases? For other reasons, I've been thinking it would be nice to figure out a general framework to "forward" such arguments. It does handle tagged pcollections and validating the input pcollection > (does the input pcollection define the input field and is the input field > type consistent with the DoFn.process type hint?) and creating the correct > output type hint for the output pcollection (same as the input schema but > with a new or overwritten output field as the first field with field type > inferred from DoFn.process) > Nice. This certainly seems a worthwhile addition. > On Mon, May 12, 2025 at 11:14 AM Robert Bradshaw <rober...@waymo.com> > wrote: > >> On Sun, May 11, 2025 at 7:35 PM Reuven Lax via dev <dev@beam.apache.org> >> wrote: >> >>> My first thought is that this should go in contrib for now. >>> >>> BTW in the Java SDK, field access is integrated directly into ParDo. >>> e.g. you can write >>> >>> new DoFn<> { >>> @ProcessElement >>> public void process(@FieldAccess("field1") Type1 >>> field1, @FieldAccess("field2") Type2 field2) { >>> ... >>> } >>> } >>> >>> It also supports selecting wildcards (e.g. @FieldAccess("top.*")). >>> >> >> BTW, how is this different than @FieldAccess("top")? >> >> >>> I'm not sure how this pattern would translate into the Python SDK >>> though. >>> >> >> It would probably look like >> >> def process(field1=FieldAccess("field1"), ...) >> >> though in Python there's much less need as Row objects are not as >> cumbersome to use, e.g. >> >> def process(row): >> # access row.field1 directly here >> >> though it could be useful for optimizations like projection lifting. >> >> >> As for the original question, the value here is being able to adapt a >> DoFn<T, O> to apply to a single field of a Row? This certainly seems to >> have value. I might suggest a syntax like >> >> schema_pcoll | DoToField(SomeDoFn(), input_field="element", >> output_field="word") >> >> and preserve rather than elide the input field (at least as an option). >> >> Does this handle the full DoFn spec (e.g. bundle start/finish, WIndowFn >> params, state and timers, etc.)? >> >> On Sat, May 10, 2025 at 3:35 AM Joey Tran <joey.t...@schrodinger.com> >>> wrote: >>> >>>> Not currently >>>> >>>> On Sat, May 10, 2025, 12:48 AM Reuven Lax <re...@google.com> wrote: >>>> >>>>> Does this work with nested fields? Can you specify Input_field="a.b.c"? >>>>> >>>>> On Fri, May 9, 2025 at 7:18 PM Joey Tran <joey.t...@schrodinger.com> >>>>> wrote: >>>>> >>>>>> Sure! >>>>>> >>>>>> Given a DoFn that has... >>>>>> >>>>>> def process(self, sentence): >>>>>> yield from sentence.split() >>>>>> >>>>>> >>>>>> You could use it with SchemadParDo as: >>>>>> >>>>>> (p | beam.Create([pvalue.Row(element="hello world", id="id")]) >>>>>> | SchemadParDo(SchemadParDo(SplitSentenceDoFn(), >>>>>> input_field="element", output_field="word")) >>>>>> >>>>>> And it'd produce Row(word="hello", id="id") and Row(word=""world", >>>>>> id="id") >>>>>> >>>>>> On Fri, May 9, 2025, 9:57 PM Reuven Lax via dev <dev@beam.apache.org> >>>>>> wrote: >>>>>> >>>>>>> Can you explain a bit how SchemadParDo works? >>>>>>> >>>>>>> On Fri, May 9, 2025 at 4:49 PM Joey Tran <joey.t...@schrodinger.com> >>>>>>> wrote: >>>>>>> >>>>>>>> I've written a `SchemadParDo(input_field: str, output_field, >>>>>>>> dofn:DoFn)` transform for more easily writing a Schemad transform >>>>>>>> given a >>>>>>>> DoFn. >>>>>>>> >>>>>>>> Is this something worth upstreaming into the Beam Python SDK? I >>>>>>>> wrote it to make it easier to convert our current set of dofn's into >>>>>>>> schemad dofns for use with the YAML SDK. Just wanted to gauge interest >>>>>>>> before setting up the dev env again >>>>>>>> >>>>>>>