I'm curious if there's been any exploration of ways to generally convert a library of non-schemad transforms (in python/java/etc) to a library of schemad forms (e.g. to expose a YAML version of a library).
This SchemadParDo is one way that I've been trying to do that, but isn't exactly full proof as we see here. It also requires carrying around possibly a lot of extra fields that aren't necessary for the underlying DoFn. I've also considered using putting row-identifying information into an element's window and then unwindowing and reschemaing it afterwards. That also doesn't feel like the greatest solution. Are there any other ideas floating around? On Mon, May 12, 2025 at 3:08 PM Joey Tran <joey.t...@schrodinger.com> wrote: > Does this handle the full DoFn spec (e.g. bundle start/finish, WIndowFn >>> params, state and timers, etc.)? >> >> It handles the DoFn lifecycle methods (setup/start/finish/teardown), with >> the exception that it doesn't currently handle generating elements in >> `finish_bundle`. >> >> Yeah, that seems intrinsically problematic. (I suppose it could also be >> dangerous for any DoFn that buffers elements, but to do that one would need >> to, in the general case, emit from finish bundle.) > > > Ah right... The association between input row and output rows doesn't > really make sense with elements emitted from finish_bundle. > > I'm not sure how this should be resolved. I suppose we could just throw an > error if there's anything ever yielded from finish_bundle, but we wouldn't > know this until runtime. I'm sure there's also quite a number of dofn's > that implement buffers. > > On Mon, May 12, 2025 at 11:30 AM Robert Bradshaw <rober...@waymo.com> > wrote: > >> On Mon, May 12, 2025 at 8:25 AM Joey Tran <joey.t...@schrodinger.com> >> wrote: >> >>> and preserve rather than elide the input field (at least as an option). >>>> >>> >>> Oh yes, that is actually the currently implemented behavior, I just >>> forgot to type the input field in my example (probably shouldn't have tried >>> to do it on mobile) >>> >> >> +1 >> >> >>> Does this handle the full DoFn spec (e.g. bundle start/finish, WIndowFn >>>> params, state and timers, etc.)? >>> >>> It handles the DoFn lifecycle methods (setup/start/finish/teardown), >>> with the exception that it doesn't currently handle generating elements in >>> `finish_bundle`. >>> >> >> Yeah, that seems intrinsically problematic. (I suppose it could also be >> dangerous for any DoFn that buffers elements, but to do that one would need >> to, in the general case, emit from finish bundle.) >> >> >>> It also doesn't currently handle any params, side inputs, state, timers, >>> or batching (though the implementation doesn't preclude implementing those >>> later afaict) >>> >> >> Does it detect/nicely fail in such cases? For other reasons, I've been >> thinking it would be nice to figure out a general framework to "forward" >> such arguments. >> >> It does handle tagged pcollections and validating the input pcollection >>> (does the input pcollection define the input field and is the input field >>> type consistent with the DoFn.process type hint?) and creating the correct >>> output type hint for the output pcollection (same as the input schema but >>> with a new or overwritten output field as the first field with field type >>> inferred from DoFn.process) >>> >> >> Nice. >> >> This certainly seems a worthwhile addition. >> >> >>> On Mon, May 12, 2025 at 11:14 AM Robert Bradshaw <rober...@waymo.com> >>> wrote: >>> >>>> On Sun, May 11, 2025 at 7:35 PM Reuven Lax via dev <dev@beam.apache.org> >>>> wrote: >>>> >>>>> My first thought is that this should go in contrib for now. >>>>> >>>>> BTW in the Java SDK, field access is integrated directly into ParDo. >>>>> e.g. you can write >>>>> >>>>> new DoFn<> { >>>>> @ProcessElement >>>>> public void process(@FieldAccess("field1") Type1 >>>>> field1, @FieldAccess("field2") Type2 field2) { >>>>> ... >>>>> } >>>>> } >>>>> >>>>> It also supports selecting wildcards (e.g. @FieldAccess("top.*")). >>>>> >>>> >>>> BTW, how is this different than @FieldAccess("top")? >>>> >>>> >>>>> I'm not sure how this pattern would translate into the Python SDK >>>>> though. >>>>> >>>> >>>> It would probably look like >>>> >>>> def process(field1=FieldAccess("field1"), ...) >>>> >>>> though in Python there's much less need as Row objects are not as >>>> cumbersome to use, e.g. >>>> >>>> def process(row): >>>> # access row.field1 directly here >>>> >>>> though it could be useful for optimizations like projection lifting. >>>> >>>> >>>> As for the original question, the value here is being able to adapt a >>>> DoFn<T, O> to apply to a single field of a Row? This certainly seems to >>>> have value. I might suggest a syntax like >>>> >>>> schema_pcoll | DoToField(SomeDoFn(), input_field="element", >>>> output_field="word") >>>> >>>> and preserve rather than elide the input field (at least as an option). >>>> >>>> Does this handle the full DoFn spec (e.g. bundle start/finish, WIndowFn >>>> params, state and timers, etc.)? >>>> >>>> On Sat, May 10, 2025 at 3:35 AM Joey Tran <joey.t...@schrodinger.com> >>>>> wrote: >>>>> >>>>>> Not currently >>>>>> >>>>>> On Sat, May 10, 2025, 12:48 AM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> Does this work with nested fields? Can you specify >>>>>>> Input_field="a.b.c"? >>>>>>> >>>>>>> On Fri, May 9, 2025 at 7:18 PM Joey Tran <joey.t...@schrodinger.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Sure! >>>>>>>> >>>>>>>> Given a DoFn that has... >>>>>>>> >>>>>>>> def process(self, sentence): >>>>>>>> yield from sentence.split() >>>>>>>> >>>>>>>> >>>>>>>> You could use it with SchemadParDo as: >>>>>>>> >>>>>>>> (p | beam.Create([pvalue.Row(element="hello world", id="id")]) >>>>>>>> | SchemadParDo(SchemadParDo(SplitSentenceDoFn(), >>>>>>>> input_field="element", output_field="word")) >>>>>>>> >>>>>>>> And it'd produce Row(word="hello", id="id") and Row(word=""world", >>>>>>>> id="id") >>>>>>>> >>>>>>>> On Fri, May 9, 2025, 9:57 PM Reuven Lax via dev < >>>>>>>> dev@beam.apache.org> wrote: >>>>>>>> >>>>>>>>> Can you explain a bit how SchemadParDo works? >>>>>>>>> >>>>>>>>> On Fri, May 9, 2025 at 4:49 PM Joey Tran < >>>>>>>>> joey.t...@schrodinger.com> wrote: >>>>>>>>> >>>>>>>>>> I've written a `SchemadParDo(input_field: str, output_field, >>>>>>>>>> dofn:DoFn)` transform for more easily writing a Schemad transform >>>>>>>>>> given a >>>>>>>>>> DoFn. >>>>>>>>>> >>>>>>>>>> Is this something worth upstreaming into the Beam Python SDK? I >>>>>>>>>> wrote it to make it easier to convert our current set of dofn's into >>>>>>>>>> schemad dofns for use with the YAML SDK. Just wanted to gauge >>>>>>>>>> interest >>>>>>>>>> before setting up the dev env again >>>>>>>>>> >>>>>>>>>