On 5/30/20 5:39 AM, Kenneth Knowles wrote:
Agree to delete them, though for different reasons. I think this code
comes from a desire to have methods that can be called on a DoFn
directly. And from reviewing the code history I think they are copied
in from another class. So that's why they are the way they are.
Accepting a DoFnSignature would be more appropriate to the
"plural-class-name companion class" pattern. But I doubt the perf
impact of this is ever measurable, and of course not relative to a big
data processing job. If we really wanted the current API, a cache is
trivial, but also not important so we shouldn't add one.
Reason I think they should be deleted:
1. They seem to exist as a shortcut to people don't forget to call
both DoFnSignatures#usesState and DoFnSignatures#usesTimers [1]. But
now if another relevant method is added, the new method doesn't
include it, so the problem of not forgetting to call all relevant
methods is not solved.
There are multiple ways runners test for "statefulness" of a DoFn. Some
use DoFnSignature#usesState(), some DoFnSignatures#usesState(), some
DoFnSignatures#isStateful() and some even
DoFnSignature.stateDeclarations() > 0. Having so many ways for a simple
check that DoFn needs to be executed as a stateful seems to be suboptimal.
I don't see anything weird on definition of "stateful dofn", which is
any DoFn, that has the following requirements:
a) is keyed
b) requires shuffling same keys to same workers
c) requires support for both state and timers
2. They seem incorrect [2]. Just because something requires time
sorted input *does not* mean it uses bag state.
Yes, this is unfortunate. What makes the DoFn use bag state is "when the
runner executes the DoFn using default expansion". I agree this is not
the same, but the correct solution seems again routed to the discussion
about pipeline requirements vs. runner capabilities vs. default and
overridden expansions. It would be better to use the standard expansion
mechanism, but AFAIK it is not possible currently, because it is not
possible to simply wrap two stateful dofns one inside another (that
would require dynamic states).
Jan
Kenn
[1]
https://github.com/apache/beam/blob/dba5f2b9d8625a3be3dae026858ecacf20947616/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L2432
[2]
https://github.com/apache/beam/blob/dba5f2b9d8625a3be3dae026858ecacf20947616/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L2449
On Fri, May 29, 2020 at 8:46 AM Luke Cwik <lc...@google.com
<mailto:lc...@google.com>> wrote:
To go back to your original question.
I would remove the static convenience methods in DoFnSignatures
since they construct a DoFnSignature and then throw it away. This
construction is pretty involved, nothing as large as an IO call
but it would become noticeable if it was abused. We can already
see that it is being used multiple times in a row [1, 2].
Runners should create their own derived properties based upon
knowledge of how they are implemented and we shouldn't create
derived properties for different concepts (e.g. merging isStateful
and @RequiresTimeSortedInput). If there is a common implementation
that is shared across multiple runners, it could "translate" a
DoFnSignature based upon how it is implemented and/or define its
own thing.
1:
https://github.com/apache/beam/blob/0addd1f08a2e3f424199c1054c06f363bb77a019/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java#L61
2:
https://github.com/apache/beam/blob/0addd1f08a2e3f424199c1054c06f363bb77a019/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java#L73
On Wed, May 27, 2020 at 3:16 AM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Right, this might be about a definition of what these methods
really should return. Currently, the most visible issue is
[1]. When a DoFn has no state or timer, but is annotated with
@RequiresTimeSortedInput this annotation is silently ignored,
because DoFnSignature#usesState returns false and the ParDo is
executed as stateless.
I agree that there are two points - what user declares and
what runner effectively needs to execute a DoFn. Another
complication to this is that what runner needs might depend
not only on the DoFn itself, but on other conditions - e.g.
RequiresTimeSortedInput does not require any state or timer in
bounded case, when runner can presort the data. There might be
additional inputs to this decision as well.
I don't quite agree that DoFnSignature#isStateful is a bad
name - when a DoFn has only timer and no state, it is still
stateful, although usesState should return false. Or we would
have to declare timer a state, which would be even more
confusing (although it might be technically correct).
[1] https://issues.apache.org/jira/browse/BEAM-10072
On 5/27/20 1:21 AM, Luke Cwik wrote:
I believe DoFnSignature#isStateful is remnant of a bad API
name choice and was renamed to usesState. I would remove
DoFnSignature#isStateful as it does not seem to be used
anywhere.
Does DoFnSignatures#usesValueState return true if the DoFn
says it needs @RequiresTimeSortedInput because of how a DoFn
is being "wrapped" with a stateful DoFn that provides the
time sorting functionality?
That doesn't seem right since I would have always expected
that DoFnSignature(s) should be about the DoFn passed in and
not about the implementation details that a runner might be
using in how it provides @RequiresTimeSortedInput.
(similarly for
DoFnSignatures#usesBagState, DoFnSignatures#usesWatermarkHold,
DoFnSignatures#usesTimers, DoFnSignatures#usesState)
On Mon, May 25, 2020 at 2:31 AM Jan Lukavský <je...@seznam.cz
<mailto:je...@seznam.cz>> wrote:
Hi,
I have come across issue with multiple way of getting a
meaningful flags
for DoFns. We have
a) DoFnSignature#{usesState,usesTimers,isStateful,...}, and
b) DoFnSignatures#{usesState,usesTimers,isStateful,...}
These two might not (and actually are not) aligned with
each other. That
can be solved quite easily (removing any logic from
DoFnSignatures and
put it to DoFnSignature), but what I'm not sure is why
DoFnSignature#isStateful is deprecated in favor of
DoFnSignature#usesState. In my understanding, it should
hold that
`isStateful() iff usesState() || usesTimers()`, which
means these two
should not be used interchangeably. I'd suggest to
undeprecate the
`DoFnSignature#isStateful` and align the various (static
and non-static)
versions of these calls.
Thoughts?
Jan