Does this handle the full DoFn spec (e.g. bundle start/finish, WIndowFn
>> params, state and timers, etc.)?
>
> It handles the DoFn lifecycle methods (setup/start/finish/teardown), with
> the exception that it doesn't currently handle generating elements in
> `finish_bundle`.
>
> Yeah, that seems intrinsically problematic. (I suppose it could also be
> dangerous for any DoFn that buffers elements, but to do that one would need
> to, in the general case, emit from finish bundle.)


Ah right... The association between input row and output rows doesn't
really make sense with elements emitted from finish_bundle.

I'm not sure how this should be resolved. I suppose we could just throw an
error if there's anything ever yielded from finish_bundle, but we wouldn't
know this until runtime. I'm sure there's also quite a number of dofn's
that implement buffers.

On Mon, May 12, 2025 at 11:30 AM Robert Bradshaw <rober...@waymo.com> wrote:

> On Mon, May 12, 2025 at 8:25 AM Joey Tran <joey.t...@schrodinger.com>
> wrote:
>
>> and preserve rather than elide the input field (at least as an option).
>>>
>>
>> Oh yes, that is actually the currently implemented behavior, I just
>> forgot to type the input field in my example (probably shouldn't have tried
>> to do it on mobile)
>>
>
> +1
>
>
>>  Does this handle the full DoFn spec (e.g. bundle start/finish, WIndowFn
>>> params, state and timers, etc.)?
>>
>> It handles the DoFn lifecycle methods (setup/start/finish/teardown), with
>> the exception that it doesn't currently handle generating elements in
>> `finish_bundle`.
>>
>
> Yeah, that seems intrinsically problematic. (I suppose it could also be
> dangerous for any DoFn that buffers elements, but to do that one would need
> to, in the general case, emit from finish bundle.)
>
>
>> It also doesn't currently handle any params, side inputs, state, timers,
>> or batching (though the implementation doesn't preclude implementing those
>> later afaict)
>>
>
> Does it detect/nicely fail in such cases? For other reasons, I've been
> thinking it would be nice to figure out a general framework to "forward"
> such arguments.
>
> It does handle tagged pcollections and validating the input pcollection
>> (does the input pcollection define the input field and is the input field
>> type consistent with the DoFn.process type hint?) and creating the correct
>> output type hint for the output pcollection (same as the input schema but
>> with a new or overwritten output field as the first field with field type
>> inferred from DoFn.process)
>>
>
> Nice.
>
> This certainly seems a worthwhile addition.
>
>
>> On Mon, May 12, 2025 at 11:14 AM Robert Bradshaw <rober...@waymo.com>
>> wrote:
>>
>>> On Sun, May 11, 2025 at 7:35 PM Reuven Lax via dev <dev@beam.apache.org>
>>> wrote:
>>>
>>>> My first thought is that this should go in contrib for now.
>>>>
>>>> BTW in the Java SDK, field access is integrated directly into ParDo.
>>>> e.g. you can write
>>>>
>>>> new DoFn<> {
>>>>    @ProcessElement
>>>>    public void process(@FieldAccess("field1") Type1
>>>> field1, @FieldAccess("field2") Type2 field2) {
>>>>       ...
>>>>     }
>>>> }
>>>>
>>>> It also supports selecting wildcards (e.g. @FieldAccess("top.*")).
>>>>
>>>
>>> BTW, how is this different than @FieldAccess("top")?
>>>
>>>
>>>> I'm not sure how this pattern would translate into the Python SDK
>>>> though.
>>>>
>>>
>>> It would probably look like
>>>
>>>   def process(field1=FieldAccess("field1"), ...)
>>>
>>> though in Python there's much less need as Row objects are not as
>>> cumbersome to use, e.g.
>>>
>>>   def process(row):
>>>     # access row.field1 directly here
>>>
>>> though it could be useful for optimizations like projection lifting.
>>>
>>>
>>> As for the original question, the value here is being able to adapt a
>>> DoFn<T, O> to apply to a single field of a Row? This certainly seems to
>>> have value. I might suggest a syntax like
>>>
>>>     schema_pcoll | DoToField(SomeDoFn(), input_field="element",
>>> output_field="word")
>>>
>>> and preserve rather than elide the input field (at least as an option).
>>>
>>> Does this handle the full DoFn spec (e.g. bundle start/finish, WIndowFn
>>> params, state and timers, etc.)?
>>>
>>> On Sat, May 10, 2025 at 3:35 AM Joey Tran <joey.t...@schrodinger.com>
>>>> wrote:
>>>>
>>>>> Not currently
>>>>>
>>>>> On Sat, May 10, 2025, 12:48 AM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Does this work with nested fields? Can you specify
>>>>>> Input_field="a.b.c"?
>>>>>>
>>>>>> On Fri, May 9, 2025 at 7:18 PM Joey Tran <joey.t...@schrodinger.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Sure!
>>>>>>>
>>>>>>> Given a DoFn that has...
>>>>>>>
>>>>>>> def process(self, sentence):
>>>>>>>     yield from sentence.split()
>>>>>>>
>>>>>>>
>>>>>>> You could use it with SchemadParDo as:
>>>>>>>
>>>>>>> (p | beam.Create([pvalue.Row(element="hello world", id="id")])
>>>>>>> | SchemadParDo(SchemadParDo(SplitSentenceDoFn(),
>>>>>>> input_field="element", output_field="word"))
>>>>>>>
>>>>>>> And it'd produce Row(word="hello", id="id") and Row(word=""world",
>>>>>>> id="id")
>>>>>>>
>>>>>>> On Fri, May 9, 2025, 9:57 PM Reuven Lax via dev <dev@beam.apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Can you explain a bit how SchemadParDo works?
>>>>>>>>
>>>>>>>> On Fri, May 9, 2025 at 4:49 PM Joey Tran <joey.t...@schrodinger.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I've written a `SchemadParDo(input_field: str, output_field,
>>>>>>>>> dofn:DoFn)` transform for more easily writing a Schemad transform 
>>>>>>>>> given a
>>>>>>>>> DoFn.
>>>>>>>>>
>>>>>>>>> Is this something worth upstreaming into the Beam Python SDK? I
>>>>>>>>> wrote it to make it easier to convert our current set of dofn's into
>>>>>>>>> schemad dofns for use with the YAML SDK. Just wanted to gauge interest
>>>>>>>>> before setting up the dev env again
>>>>>>>>>
>>>>>>>>

Reply via email to