On Thu, Jun 11, 2020 at 1:26 AM Jan Lukavský <je...@seznam.cz> wrote:
> Hi, > > I'd propose the following: > > - delete all DoFnSignatures.{usesState,usesTimers,...} helpers *except* > for DoFnSignatures.isStateful > Why? Actually seems like maybe the opposite is better? (remove isStateful and keep the others). There are cases where it might be useful to know if just timers are used. - DoFnSignatures.isStateful would be equal to 'signature.usesState() || > signature.usesTimers() || > signature.processElement().requiresTimeSortedInput()' > requiresTimeSortedInput does not imply isStateful in general - that seems like a runner-dependent thing. > - fix all _relevant_ places in all runners where are currently checks for > statefulness like 'doFnSignature.stateDeclarations().size() > 0' or > 'doFnSignature.usesState()', but with sematics 'DoFnSignatures.isStateful()` > > WDYT? > On 5/31/20 2:27 PM, Jan Lukavský wrote: > > On 5/30/20 5:39 AM, Kenneth Knowles wrote: > > Agree to delete them, though for different reasons. I think this code > comes from a desire to have methods that can be called on a DoFn directly. > And from reviewing the code history I think they are copied in from another > class. So that's why they are the way they are. Accepting a DoFnSignature > would be more appropriate to the "plural-class-name companion class" > pattern. But I doubt the perf impact of this is ever measurable, and of > course not relative to a big data processing job. If we really wanted the > current API, a cache is trivial, but also not important so we shouldn't add > one. > > > Reason I think they should be deleted: > 1. They seem to exist as a shortcut to people don't forget to call both > DoFnSignatures#usesState and DoFnSignatures#usesTimers [1]. But now if > another relevant method is added, the new method doesn't include it, so the > problem of not forgetting to call all relevant methods is not solved. > > There are multiple ways runners test for "statefulness" of a DoFn. Some > use DoFnSignature#usesState(), some DoFnSignatures#usesState(), some > DoFnSignatures#isStateful() and some even DoFnSignature.stateDeclarations() > > 0. Having so many ways for a simple check that DoFn needs to be executed > as a stateful seems to be suboptimal. > > I don't see anything weird on definition of "stateful dofn", which is any > DoFn, that has the following requirements: > > a) is keyed > > b) requires shuffling same keys to same workers > > c) requires support for both state and timers > > 2. They seem incorrect [2]. Just because something requires time sorted > input *does not* mean it uses bag state. > > Yes, this is unfortunate. What makes the DoFn use bag state is "when the > runner executes the DoFn using default expansion". I agree this is not the > same, but the correct solution seems again routed to the discussion about > pipeline requirements vs. runner capabilities vs. default and overridden > expansions. It would be better to use the standard expansion mechanism, but > AFAIK it is not possible currently, because it is not possible to simply > wrap two stateful dofns one inside another (that would require dynamic > states). > > Jan > > > Kenn > > [1] > https://github.com/apache/beam/blob/dba5f2b9d8625a3be3dae026858ecacf20947616/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L2432 > [2] > https://github.com/apache/beam/blob/dba5f2b9d8625a3be3dae026858ecacf20947616/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L2449 > > On Fri, May 29, 2020 at 8:46 AM Luke Cwik <lc...@google.com> wrote: > >> To go back to your original question. >> >> I would remove the static convenience methods in DoFnSignatures since >> they construct a DoFnSignature and then throw it away. This construction is >> pretty involved, nothing as large as an IO call but it would become >> noticeable if it was abused. We can already see that it is being used >> multiple times in a row [1, 2]. >> >> Runners should create their own derived properties based upon knowledge >> of how they are implemented and we shouldn't create derived properties for >> different concepts (e.g. merging isStateful and @RequiresTimeSortedInput). >> If there is a common implementation that is shared across multiple runners, >> it could "translate" a DoFnSignature based upon how it is implemented >> and/or define its own thing. >> >> 1: >> https://github.com/apache/beam/blob/0addd1f08a2e3f424199c1054c06f363bb77a019/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java#L61 >> 2: >> https://github.com/apache/beam/blob/0addd1f08a2e3f424199c1054c06f363bb77a019/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java#L73 >> >> On Wed, May 27, 2020 at 3:16 AM Jan Lukavský <je...@seznam.cz> wrote: >> >>> Right, this might be about a definition of what these methods really >>> should return. Currently, the most visible issue is [1]. When a DoFn has no >>> state or timer, but is annotated with @RequiresTimeSortedInput this >>> annotation is silently ignored, because DoFnSignature#usesState returns >>> false and the ParDo is executed as stateless. >>> >>> I agree that there are two points - what user declares and what runner >>> effectively needs to execute a DoFn. Another complication to this is that >>> what runner needs might depend not only on the DoFn itself, but on other >>> conditions - e.g. RequiresTimeSortedInput does not require any state or >>> timer in bounded case, when runner can presort the data. There might be >>> additional inputs to this decision as well. >>> >>> I don't quite agree that DoFnSignature#isStateful is a bad name - when a >>> DoFn has only timer and no state, it is still stateful, although usesState >>> should return false. Or we would have to declare timer a state, which would >>> be even more confusing (although it might be technically correct). >>> >>> [1] https://issues.apache.org/jira/browse/BEAM-10072 >>> On 5/27/20 1:21 AM, Luke Cwik wrote: >>> >>> I believe DoFnSignature#isStateful is remnant of a bad API name choice >>> and was renamed to usesState. I would remove DoFnSignature#isStateful as it >>> does not seem to be used anywhere. >>> >>> Does DoFnSignatures#usesValueState return true if the DoFn says it needs >>> @RequiresTimeSortedInput because of how a DoFn is being "wrapped" with a >>> stateful DoFn that provides the time sorting functionality? >>> >>> That doesn't seem right since I would have always expected that >>> DoFnSignature(s) should be about the DoFn passed in and not about the >>> implementation details that a runner might be using in how it >>> provides @RequiresTimeSortedInput. >>> >>> (similarly for >>> DoFnSignatures#usesBagState, DoFnSignatures#usesWatermarkHold, >>> DoFnSignatures#usesTimers, DoFnSignatures#usesState) >>> >>> >>> >>> >>> On Mon, May 25, 2020 at 2:31 AM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> Hi, >>>> >>>> I have come across issue with multiple way of getting a meaningful >>>> flags >>>> for DoFns. We have >>>> >>>> a) DoFnSignature#{usesState,usesTimers,isStateful,...}, and >>>> >>>> b) DoFnSignatures#{usesState,usesTimers,isStateful,...} >>>> >>>> These two might not (and actually are not) aligned with each other. >>>> That >>>> can be solved quite easily (removing any logic from DoFnSignatures and >>>> put it to DoFnSignature), but what I'm not sure is why >>>> DoFnSignature#isStateful is deprecated in favor of >>>> DoFnSignature#usesState. In my understanding, it should hold that >>>> `isStateful() iff usesState() || usesTimers()`, which means these two >>>> should not be used interchangeably. I'd suggest to undeprecate the >>>> `DoFnSignature#isStateful` and align the various (static and >>>> non-static) >>>> versions of these calls. >>>> >>>> Thoughts? >>>> >>>> Jan >>>> >>>>