je-ik commented on a change in pull request #11808:
URL: https://github.com/apache/beam/pull/11808#discussion_r444722211
##########
File path:
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
##########
@@ -739,7 +737,9 @@ private static ParDoPayload
getParDoPayload(RunnerApi.PTransform parDoPTransform
public static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?>
transform) throws IOException {
ParDoPayload payload = getParDoPayload(transform);
- return payload.getStateSpecsCount() > 0 ||
payload.getTimerFamilySpecsCount() > 0;
+ return payload.getStateSpecsCount() > 0
+ || payload.getTimerFamilySpecsCount() > 0
+ || payload.getRequiresTimeSortedInput();
Review comment:
That is correct, I'm aware there was no clear consensus, so I tried to
minimize the changes needed to fix the underlying issue. I think, that
RequiresTimeSortedInput implies "statefulness" (which is the actual purpose of
this method), because there is no way to sort data without state (at least in -
generic - streaming case). The alternative way seems to be to introduce new
`PTransformMatcher` but that would be just for case of "stateless" ordered
DoFns, which - although legitimate - are sort of corner case. Vast majority of
uses of RequiresTimeSortedInput will match the original condition of
`getStateSpecsCount() > 0`. I'm not sure if introducing additional complexity
of another PTransformMatcher, incorporating that in all runners that support
the annotation is worth it. WDYT?
##########
File path:
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -2572,6 +2572,49 @@ public void testTwoRequiresTimeSortedInputWithLateData()
{
false);
}
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ UsesRequiresTimeSortedInput.class,
+ UsesStrictTimerOrdering.class,
+ UsesTestStream.class
+ })
+ public void testRequiresTimeSortedInputWithStatelessDoFn() {
+ // generate list long enough to rule out random shuffle in sorted order
+ int numElements = 1000;
+ List<Long> eventStamps =
+ LongStream.range(0, numElements)
+ .mapToObj(i -> numElements - i)
+ .collect(Collectors.toList());
+ TestStream.Builder<Long> stream = TestStream.create(VarLongCoder.of());
+ for (Long stamp : eventStamps) {
+ stream = stream.addElements(TimestampedValue.of(stamp,
Instant.ofEpochMilli(stamp)));
+ }
+ testTimeSortedInputStateless(
+ numElements, pipeline.apply(stream.advanceWatermarkToInfinity()));
+ }
+
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
Review comment:
I updated the javadoc, thanks. The comment _Note that this annotation
makes sense only for stateful ParDos, because outcome of stateless functions
cannot depend on the ordering._ was replaced, because it built upon false
premise (even stateless DoFn can have requirement for ordering, in cases it
uses some form of "implicit" state - this can be RPC, internal clock, or
similar mechanisms).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]