lukecwik commented on a change in pull request #11808: URL: https://github.com/apache/beam/pull/11808#discussion_r443789083
########## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ########## @@ -510,15 +509,13 @@ public RawUnionValue map(T o) throws Exception { Coder keyCoder = null; KeySelector<WindowedValue<InputT>, ?> keySelector = null; - boolean stateful = false; - DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); - if (signature.stateDeclarations().size() > 0 || signature.timerDeclarations().size() > 0) { + boolean stateful = DoFnSignatures.isStateful(doFn); + if (stateful) { // Based on the fact that the signature is stateful, DoFnSignatures ensures // that it is also keyed keyCoder = ((KvCoder) input.getCoder()).getKeyCoder(); keySelector = new KvToByteBufferKeySelector(keyCoder); inputDataStream = inputDataStream.keyBy(keySelector); - stateful = true; } else if (doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn) { Review comment: This is an example where how a runner chooses to execute a transform sets that it is "stateful" ########## File path: runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ########## @@ -739,7 +737,9 @@ private static ParDoPayload getParDoPayload(RunnerApi.PTransform parDoPTransform public static boolean usesStateOrTimers(AppliedPTransform<?, ?, ?> transform) throws IOException { ParDoPayload payload = getParDoPayload(transform); - return payload.getStateSpecsCount() > 0 || payload.getTimerFamilySpecsCount() > 0; + return payload.getStateSpecsCount() > 0 + || payload.getTimerFamilySpecsCount() > 0 + || payload.getRequiresTimeSortedInput(); Review comment: I don't believe we came to a consensus on the ML to have "usesStateOrTimers" represent how the DoFn is executed by a runner since a runner may choose to not implement RequiresTimeSortedInput using a wrapping stateful DoFn. ########## File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ########## @@ -2572,6 +2572,49 @@ public void testTwoRequiresTimeSortedInputWithLateData() { false); } + @Test + @Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, + UsesRequiresTimeSortedInput.class, + UsesStrictTimerOrdering.class, + UsesTestStream.class + }) + public void testRequiresTimeSortedInputWithStatelessDoFn() { + // generate list long enough to rule out random shuffle in sorted order + int numElements = 1000; + List<Long> eventStamps = + LongStream.range(0, numElements) + .mapToObj(i -> numElements - i) + .collect(Collectors.toList()); + TestStream.Builder<Long> stream = TestStream.create(VarLongCoder.of()); + for (Long stamp : eventStamps) { + stream = stream.addElements(TimestampedValue.of(stamp, Instant.ofEpochMilli(stamp))); + } + testTimeSortedInputStateless( + numElements, pipeline.apply(stream.advanceWatermarkToInfinity())); + } + + @Test + @Category({ + ValidatesRunner.class, + UsesStatefulParDo.class, Review comment: The javadoc for RequiresTimeSortedInput shouldn't have any meaningful effect on stateless DoFns. So I'm confused as why this should be allowed and not an error during DoFn signature validation? If this is a change to the javadoc, please update that as part of this PR. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org