lukecwik commented on a change in pull request #9190:
URL: https://github.com/apache/beam/pull/9190#discussion_r430620228
##########
File path:
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -3488,6 +3499,158 @@ public void onTimer(OutputReceiver<String> r) {
pipeline.run();
}
+ /** A test makes sure that an event time timers are correctly ordered. */
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesTimersInParDo.class,
+ UsesTestStream.class,
+ UsesStatefulParDo.class,
+ UsesStrictTimerOrdering.class
+ })
+ public void testEventTimeTimerOrdering() throws Exception {
+ final int numTestElements = 100;
+ final Instant now = new Instant(1500000000000L);
+ TestStream.Builder<KV<String, String>> builder =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(),
StringUtf8Coder.of()))
+ .advanceWatermarkTo(new Instant(0));
+
+ for (int i = 0; i < numTestElements; i++) {
+ builder = builder.addElements(TimestampedValue.of(KV.of("dummy", "" +
i), now.plus(i)));
+ builder = builder.advanceWatermarkTo(now.plus(i / 10 * 10));
+ }
+
+ testEventTimeTimerOrderingWithInputPTransform(
+ now, numTestElements, builder.advanceWatermarkToInfinity());
+ }
+
+ /** A test makes sure that an event time timers are correctly ordered
using Create transform. */
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesTimersInParDo.class,
+ UsesStatefulParDo.class,
+ UsesStrictTimerOrdering.class
+ })
+ public void testEventTimeTimerOrderingWithCreate() throws Exception {
+ final int numTestElements = 100;
+ final Instant now = new Instant(1500000000000L);
+
+ List<TimestampedValue<KV<String, String>>> elements = new ArrayList<>();
+ for (int i = 0; i < numTestElements; i++) {
+ elements.add(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i)));
+ }
+
+ testEventTimeTimerOrderingWithInputPTransform(
Review comment:
The elements in the PCollection produced by the Create transform are
considered unordered so the stateful DoFn could see (dummy, 1) before it sees
(dummy, 0). Doesn't the DoFn need to be marked with `@RequiresTimeSortedInput`
in this case?
##########
File path:
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -3488,6 +3499,158 @@ public void onTimer(OutputReceiver<String> r) {
pipeline.run();
}
+ /** A test makes sure that an event time timers are correctly ordered. */
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesTimersInParDo.class,
+ UsesTestStream.class,
+ UsesStatefulParDo.class,
+ UsesStrictTimerOrdering.class
+ })
+ public void testEventTimeTimerOrdering() throws Exception {
+ final int numTestElements = 100;
+ final Instant now = new Instant(1500000000000L);
+ TestStream.Builder<KV<String, String>> builder =
+ TestStream.create(KvCoder.of(StringUtf8Coder.of(),
StringUtf8Coder.of()))
+ .advanceWatermarkTo(new Instant(0));
+
+ for (int i = 0; i < numTestElements; i++) {
+ builder = builder.addElements(TimestampedValue.of(KV.of("dummy", "" +
i), now.plus(i)));
+ builder = builder.advanceWatermarkTo(now.plus(i / 10 * 10));
+ }
+
+ testEventTimeTimerOrderingWithInputPTransform(
+ now, numTestElements, builder.advanceWatermarkToInfinity());
+ }
+
+ /** A test makes sure that an event time timers are correctly ordered
using Create transform. */
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesTimersInParDo.class,
+ UsesStatefulParDo.class,
+ UsesStrictTimerOrdering.class
+ })
+ public void testEventTimeTimerOrderingWithCreate() throws Exception {
+ final int numTestElements = 100;
+ final Instant now = new Instant(1500000000000L);
+
+ List<TimestampedValue<KV<String, String>>> elements = new ArrayList<>();
+ for (int i = 0; i < numTestElements; i++) {
+ elements.add(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i)));
+ }
+
+ testEventTimeTimerOrderingWithInputPTransform(
Review comment:
@je-ik @y1chi @kennknowles
The elements in the PCollection produced by the Create transform are
considered unordered so the stateful DoFn could see `(dummy, 1)` before it sees
`(dummy, 0)`. Doesn't the DoFn need to be marked with
`@RequiresTimeSortedInput` in this case?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]