Pablo, all the SplittableDoFn stuff is marked as @Experimental so one is
able to change it. There really is only one complicated one to change in
Watch.java, the rest are quite straightforward.

On Mon, Apr 29, 2019 at 2:23 PM Pablo Estrada <pabl...@google.com> wrote:

> Thanks all,
>  @Luke - I imagine that would be an improvement to the API, but this may
> be harder as this is already available to users, and there are those who
> have implemented SDFs under the current API. Would it be possible to make a
> backwards-compatible change to the API here?
>
> For the Python changes, I've proposed a pull request:
> https://github.com/apache/beam/pull/8430 - it was smaller than I thought
> : ) - All comments welcome please.
>
> +Boyuan Zhang <boyu...@google.com> I am happy to wait for your
> SyntheticSource PR to be merged and make the appropriate changes if you'd
> like.
> Best
> -P.
>
> On Mon, Apr 29, 2019 at 8:23 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> Would it make sense to also do this in the Java SDK?
>>
>> The would make the restriction provider also mirror the TimerSpec and
>> StateSpec which use annotations similar to how its done in Python.
>>
>> On Mon, Apr 29, 2019 at 3:42 AM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> +1 to introducing this Param for consistency (and making the
>>> substitution more obvious), and I think SDF is still new/experimental
>>> enough we can do this. I don't know if we need Spec in addition to
>>> Param and Provider.
>>>
>>> On Sat, Apr 27, 2019 at 1:07 AM Chamikara Jayalath <chamik...@google.com>
>>> wrote:
>>> >
>>> >
>>> >
>>> > On Fri, Apr 26, 2019 at 3:43 PM Pablo Estrada <pabl...@google.com>
>>> wrote:
>>> >>
>>> >> Hi all,
>>> >> Sorry about the wall of text.
>>> >> So, first of all, I thought about this while reviewing a PR by Boyuan
>>> with an example of an SDF[1]. This is very exciting btw : ).
>>> >>
>>> >> Anyway... I certainly have a limited view of the whole SDF effort,
>>> but I think it's worth discussing this particular point about the API
>>> before finalizing SDF and making it widely available. So here I go:
>>> >>
>>> >> The Python API for SDF asks users to provide a restriction provider
>>> in their process function signature. More or less the following:
>>> >>
>>> >> class MyOwnLittleSDF(beam.DoFn):
>>> >>   def process(self, element,
>>> >>               restriction_tracker=MyOwnLittleRestrictionProvider()):
>>> >>     # My DoFn logic...
>>> >>
>>> >> This is all fine, but something that I found a little odd is that the
>>> restriction provider gets replaced at runtime with a restriction tracker:
>>> >>
>>> >> class MyOwnLittleSDF(beam.DoFn):
>>> >>   def process(self, element,
>>> >>               restriction_tracker=MyOwnLittleRestrictionProvider()):
>>> >>     # This assert succeeds : )
>>> >>     assert not isinstance(restriction_tracker,
>>> >>                           MyOwnLittleRestrictionProvider)
>>> >>
>>> >> After thinking a little bit about it, I realized that the default
>>> argument simply allows us to inform the runner where to find the
>>> restriction provider; but that the thing that we need at runtime is NOT the
>>> restriction provider - but rather, the restriction tracker.
>>> >>
>>> >> A similar pattern occurs with state and timers, where the runner
>>> needs to know the sort of state, the coder for the values in that state (or
>>> the time domain for timers); but the runtime parameter is different[2]. For
>>> state and timers (and window, timestamp, pane, etc.) we provide a pattern
>>> where users give a default value that is clearly a placeholder:
>>> beam.DoFn.TimerParam, or beam.DoFn.StateParam.
>>> >
>>> >
>>> > This is the way (new) DoFn work for Python SDK. SDK (harness)
>>> identifies meanings of different (potential) arguments to a DoFn using
>>> pre-defined default values.
>>> >
>>> >>
>>> >>
>>> >> In this case, the API is fairly similar, but (at least in my
>>> imagination), it is much more clear about how the DoFnParam will be
>>> replaced with something else at runtime. A similar change could be done for
>>> SDF:
>>> >>
>>> >> class MyOwnLittleSDF(beam.DoFn):
>>> >>   MY_RESTRICTION = \
>>> >>       RestrictionSpec(provider=MyOwnLittleRestrictionProvider())
>>> >>
>>> >>   def process(
>>> >>       self, element,
>>> >>       restriction_tracker=beam.DoFn.RestrictionParam(MY_RESTRICTION)):
>>> >>     # My DoFn logic..
>>> >
>>> >
>>> >
>>> > If I understood correctly, what you propose is similar to the existing
>>> solution but we add a XXXParam parameter for consistency ?
>>> > I think this is fine and should be a relatively small change. Main
>>> point is, SDK should be able to find out the RestrictionProvider class to
>>> utilize it's methods before passing elements to DoFn.process() method:
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/common.py#L241
>>> >
>>> >
>>> >>
>>> >>
>>> >> Perhaps it is a good opportunity to consider this, since SDF is still
>>> in progress.
>>> >>
>>> >> Some pros:
>>> >> - Consistent with other parameters that we pass to DoFn methods
>>> >> - A bit more clear about what will happen at runtime
>>> >>
>>> >> Some cons:
>>> >> - SDF developers are "power users", and will have gone through the
>>> SDF documentation. This point will be clear to them.
>>> >> - This may create unnecessary work, and perhaps unintended
>>> consequences.
>>> >> - I bet there's more
>>> >>
>>> >> Thoughts?
>>> >>
>>> >> -P.
>>> >>
>>> >> [1] https://github.com/apache/beam/pull/8338
>>> >> [2]
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/userstate_test.py#L560-L586
>>> .
>>> >>
>>> >>
>>> >>
>>>
>>

Reply via email to