lukecwik commented on a change in pull request #11808:
URL: https://github.com/apache/beam/pull/11808#discussion_r443789083



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
##########
@@ -510,15 +509,13 @@ public RawUnionValue map(T o) throws Exception {
 
       Coder keyCoder = null;
       KeySelector<WindowedValue<InputT>, ?> keySelector = null;
-      boolean stateful = false;
-      DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass());
-      if (signature.stateDeclarations().size() > 0 || 
signature.timerDeclarations().size() > 0) {
+      boolean stateful = DoFnSignatures.isStateful(doFn);
+      if (stateful) {
         // Based on the fact that the signature is stateful, DoFnSignatures 
ensures
         // that it is also keyed
         keyCoder = ((KvCoder) input.getCoder()).getKeyCoder();
         keySelector = new KvToByteBufferKeySelector(keyCoder);
         inputDataStream = inputDataStream.keyBy(keySelector);
-        stateful = true;
       } else if (doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn) {

Review comment:
       This is an example where how a runner chooses to execute a transform 
sets that it is "stateful"

##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
##########
@@ -739,7 +737,9 @@ private static ParDoPayload 
getParDoPayload(RunnerApi.PTransform parDoPTransform
 
   public static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> 
transform) throws IOException {
     ParDoPayload payload = getParDoPayload(transform);
-    return payload.getStateSpecsCount() > 0 || 
payload.getTimerFamilySpecsCount() > 0;
+    return payload.getStateSpecsCount() > 0
+        || payload.getTimerFamilySpecsCount() > 0
+        || payload.getRequiresTimeSortedInput();

Review comment:
       I don't believe we came to a consensus on the ML to have 
"usesStateOrTimers" represent how the DoFn is executed by a runner since a 
runner may choose to not implement RequiresTimeSortedInput using a wrapping 
stateful DoFn.

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -2572,6 +2572,49 @@ public void testTwoRequiresTimeSortedInputWithLateData() 
{
           false);
     }
 
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesRequiresTimeSortedInput.class,
+      UsesStrictTimerOrdering.class,
+      UsesTestStream.class
+    })
+    public void testRequiresTimeSortedInputWithStatelessDoFn() {
+      // generate list long enough to rule out random shuffle in sorted order
+      int numElements = 1000;
+      List<Long> eventStamps =
+          LongStream.range(0, numElements)
+              .mapToObj(i -> numElements - i)
+              .collect(Collectors.toList());
+      TestStream.Builder<Long> stream = TestStream.create(VarLongCoder.of());
+      for (Long stamp : eventStamps) {
+        stream = stream.addElements(TimestampedValue.of(stamp, 
Instant.ofEpochMilli(stamp)));
+      }
+      testTimeSortedInputStateless(
+          numElements, pipeline.apply(stream.advanceWatermarkToInfinity()));
+    }
+
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,

Review comment:
       The javadoc for RequiresTimeSortedInput shouldn't have any meaningful 
effect on stateless DoFns. So I'm confused as why this should be allowed and 
not an error during DoFn signature validation?
   
   If this is a change to the javadoc, please update that as part of this PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to