je-ik commented on a change in pull request #11808:
URL: https://github.com/apache/beam/pull/11808#discussion_r444722211



##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
##########
@@ -739,7 +737,9 @@ private static ParDoPayload 
getParDoPayload(RunnerApi.PTransform parDoPTransform
 
   public static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> 
transform) throws IOException {
     ParDoPayload payload = getParDoPayload(transform);
-    return payload.getStateSpecsCount() > 0 || 
payload.getTimerFamilySpecsCount() > 0;
+    return payload.getStateSpecsCount() > 0
+        || payload.getTimerFamilySpecsCount() > 0
+        || payload.getRequiresTimeSortedInput();

Review comment:
       That is correct, I'm aware there was no clear consensus, so I tried to 
minimize the changes needed to fix the underlying issue. I think, that 
RequiresTimeSortedInput implies "statefulness" (which is the actual purpose of 
this method), because there is no way to sort data without state (at least in - 
generic - streaming case). The alternative way seems to be to introduce new 
`PTransformMatcher` but that would be just for case of "stateless" ordered 
DoFns, which - although legitimate - are sort of corner case. Vast majority of 
uses of RequiresTimeSortedInput will match the original condition of 
`getStateSpecsCount() > 0`. I'm not sure if introducing additional complexity 
of another PTransformMatcher, incorporating that in all runners that support 
the annotation is worth it. WDYT?

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -2572,6 +2572,49 @@ public void testTwoRequiresTimeSortedInputWithLateData() 
{
           false);
     }
 
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesRequiresTimeSortedInput.class,
+      UsesStrictTimerOrdering.class,
+      UsesTestStream.class
+    })
+    public void testRequiresTimeSortedInputWithStatelessDoFn() {
+      // generate list long enough to rule out random shuffle in sorted order
+      int numElements = 1000;
+      List<Long> eventStamps =
+          LongStream.range(0, numElements)
+              .mapToObj(i -> numElements - i)
+              .collect(Collectors.toList());
+      TestStream.Builder<Long> stream = TestStream.create(VarLongCoder.of());
+      for (Long stamp : eventStamps) {
+        stream = stream.addElements(TimestampedValue.of(stamp, 
Instant.ofEpochMilli(stamp)));
+      }
+      testTimeSortedInputStateless(
+          numElements, pipeline.apply(stream.advanceWatermarkToInfinity()));
+    }
+
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,

Review comment:
       I updated the javadoc, thanks. The comment _Note that this annotation 
makes sense only for stateful ParDos, because outcome of stateless functions 
cannot depend on the ordering._ was replaced, because it built upon false 
premise (even stateless DoFn can have requirement for ordering, in cases it 
uses some form of "implicit" state - this can be RPC, internal clock, or 
similar mechanisms).
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to