On Thu, Jun 11, 2020 at 1:26 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> I'd propose the following:
>
>  - delete all DoFnSignatures.{usesState,usesTimers,...} helpers *except*
> for DoFnSignatures.isStateful
>
Why? Actually seems like maybe the opposite  is better? (remove isStateful
and keep the others). There are cases where it might be useful to know if
just timers are used.

 - DoFnSignatures.isStateful would be equal to 'signature.usesState() ||
> signature.usesTimers() ||
> signature.processElement().requiresTimeSortedInput()'
>
requiresTimeSortedInput does not imply isStateful in general - that seems
like a runner-dependent thing.


>  - fix all _relevant_ places in all runners where are currently checks for
> statefulness like 'doFnSignature.stateDeclarations().size() > 0' or
> 'doFnSignature.usesState()', but with sematics 'DoFnSignatures.isStateful()`
>
> WDYT?
> On 5/31/20 2:27 PM, Jan Lukavský wrote:
>
> On 5/30/20 5:39 AM, Kenneth Knowles wrote:
>
> Agree to delete them, though for different reasons. I think this code
> comes from a desire to have methods that can be called on a DoFn directly.
> And from reviewing the code history I think they are copied in from another
> class. So that's why they are the way they are. Accepting a DoFnSignature
> would be more appropriate to the "plural-class-name companion class"
> pattern. But I doubt the perf impact of this is ever measurable, and of
> course not relative to a big data processing job. If we really wanted the
> current API, a cache is trivial, but also not important so we shouldn't add
> one.
>
>
> Reason I think they should be deleted:
> 1. They seem to exist as a shortcut to people don't forget to call both
> DoFnSignatures#usesState and DoFnSignatures#usesTimers [1]. But now if
> another relevant method is added, the new method doesn't include it, so the
> problem of not forgetting to call all relevant methods is not solved.
>
> There are multiple ways runners test for "statefulness" of a DoFn. Some
> use DoFnSignature#usesState(), some DoFnSignatures#usesState(), some
> DoFnSignatures#isStateful() and some even DoFnSignature.stateDeclarations()
> > 0. Having so many ways for a simple check that DoFn needs to be executed
> as a stateful seems to be suboptimal.
>
> I don't see anything weird on definition of "stateful dofn", which is any
> DoFn, that has the following requirements:
>
>  a) is keyed
>
>  b) requires shuffling same keys to same workers
>
>  c) requires support for both state and timers
>
> 2. They seem incorrect [2]. Just because something requires time sorted
> input *does not* mean it uses bag state.
>
> Yes, this is unfortunate. What makes the DoFn use bag state is "when the
> runner executes the DoFn using default expansion". I agree this is not the
> same, but the correct solution seems again routed to the discussion about
> pipeline requirements vs. runner capabilities vs. default and overridden
> expansions. It would be better to use the standard expansion mechanism, but
> AFAIK it is not possible currently, because it is not possible to simply
> wrap two stateful dofns one inside another (that would require dynamic
> states).
>
> Jan
>
>
> Kenn
>
> [1]
> https://github.com/apache/beam/blob/dba5f2b9d8625a3be3dae026858ecacf20947616/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L2432
> [2]
> https://github.com/apache/beam/blob/dba5f2b9d8625a3be3dae026858ecacf20947616/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L2449
>
> On Fri, May 29, 2020 at 8:46 AM Luke Cwik <lc...@google.com> wrote:
>
>> To go back to your original question.
>>
>> I would remove the static convenience methods in DoFnSignatures since
>> they construct a DoFnSignature and then throw it away. This construction is
>> pretty involved, nothing as large as an IO call but it would become
>> noticeable if it was abused. We can already see that it is being used
>> multiple times in a row [1, 2].
>>
>> Runners should create their own derived properties based upon knowledge
>> of how they are implemented and we shouldn't create derived properties for
>> different concepts (e.g. merging isStateful and @RequiresTimeSortedInput).
>> If there is a common implementation that is shared across multiple runners,
>> it could "translate" a DoFnSignature based upon how it is implemented
>> and/or define its own thing.
>>
>> 1:
>> https://github.com/apache/beam/blob/0addd1f08a2e3f424199c1054c06f363bb77a019/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java#L61
>> 2:
>> https://github.com/apache/beam/blob/0addd1f08a2e3f424199c1054c06f363bb77a019/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java#L73
>>
>> On Wed, May 27, 2020 at 3:16 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Right, this might be about a definition of what these methods really
>>> should return. Currently, the most visible issue is [1]. When a DoFn has no
>>> state or timer, but is annotated with @RequiresTimeSortedInput this
>>> annotation is silently ignored, because DoFnSignature#usesState returns
>>> false and the ParDo is executed as stateless.
>>>
>>> I agree that there are two points - what user declares and what runner
>>> effectively needs to execute a DoFn. Another complication to this is that
>>> what runner needs might depend not only on the DoFn itself, but on other
>>> conditions - e.g. RequiresTimeSortedInput does not require any state or
>>> timer in bounded case, when runner can presort the data. There might be
>>> additional inputs to this decision as well.
>>>
>>> I don't quite agree that DoFnSignature#isStateful is a bad name - when a
>>> DoFn has only timer and no state, it is still stateful, although usesState
>>> should return false. Or we would have to declare timer a state, which would
>>> be even more confusing (although it might be technically correct).
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-10072
>>> On 5/27/20 1:21 AM, Luke Cwik wrote:
>>>
>>> I believe DoFnSignature#isStateful is remnant of a bad API name choice
>>> and was renamed to usesState. I would remove DoFnSignature#isStateful as it
>>> does not seem to be used anywhere.
>>>
>>> Does DoFnSignatures#usesValueState return true if the DoFn says it needs
>>> @RequiresTimeSortedInput because of how a DoFn is being "wrapped" with a
>>> stateful DoFn that provides the time sorting functionality?
>>>
>>> That doesn't seem right since I would have always expected that
>>> DoFnSignature(s) should be about the DoFn passed in and not about the
>>> implementation details that a runner might be using in how it
>>> provides @RequiresTimeSortedInput.
>>>
>>> (similarly for
>>> DoFnSignatures#usesBagState, DoFnSignatures#usesWatermarkHold, 
>>> DoFnSignatures#usesTimers, DoFnSignatures#usesState)
>>>
>>>
>>>
>>>
>>> On Mon, May 25, 2020 at 2:31 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have come across issue with multiple way of getting a meaningful
>>>> flags
>>>> for DoFns. We have
>>>>
>>>>   a) DoFnSignature#{usesState,usesTimers,isStateful,...}, and
>>>>
>>>>   b) DoFnSignatures#{usesState,usesTimers,isStateful,...}
>>>>
>>>> These two might not (and actually are not) aligned with each other.
>>>> That
>>>> can be solved quite easily (removing any logic from DoFnSignatures and
>>>> put it to DoFnSignature), but what I'm not sure is why
>>>> DoFnSignature#isStateful is deprecated in favor of
>>>> DoFnSignature#usesState. In my understanding, it should hold that
>>>> `isStateful() iff usesState() || usesTimers()`, which means these two
>>>> should not be used interchangeably. I'd suggest to undeprecate the
>>>> `DoFnSignature#isStateful` and align the various (static and
>>>> non-static)
>>>> versions of these calls.
>>>>
>>>> Thoughts?
>>>>
>>>>   Jan
>>>>
>>>>

Reply via email to