reuvenlax commented on code in PR #38363:
URL: https://github.com/apache/beam/pull/38363#discussion_r3243737265


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java:
##########
@@ -3678,6 +3679,154 @@ public void processElement(
       pipeline.run();
     }
 
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesSideInputs.class,
+      UsesSideInputsInTimer.class,
+      UsesTestStream.class,
+      UsesTimersInParDo.class,
+      UsesTriggeredSideInputs.class,
+      UsesOnWindowExpiration.class
+    })
+    public void testTimerSideInput() {
+      // SideInput tag id
+      final String sideInputTag1 = "tag1";
+
+      final PCollectionView<Integer> sideInput =
+          pipeline
+              .apply("CreateSideInput1", Create.of(2))
+              .apply("ViewSideInput1", View.asSingleton());
+
+      DoFn<KV<Integer, Integer>, KV<Integer, Integer>> doFn =
+          new DoFn<KV<Integer, Integer>, KV<Integer, Integer>>() {
+            @TimerId("timer")
+            private final TimerSpec timerSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @StateId("foo")
+            private final StateSpec<ValueState<Integer>> stateSpec = 
StateSpecs.value();
+
+            @ProcessElement
+            public void process(@Timestamp Instant ts, @TimerId("timer") Timer 
timer) {
+              timer.align(Duration.standardSeconds(10)).setRelative();
+            }
+
+            @OnTimer("timer")
+            public void onTimer(
+                OutputReceiver<KV<Integer, Integer>> o,
+                @DoFn.SideInput(sideInputTag1) Integer sideInput,
+                @Key Integer key) {
+              o.output(KV.of(key, sideInput));
+            }
+
+            @OnWindowExpiration
+            public void onWindowExpiration(
+                @DoFn.SideInput(sideInputTag1) Integer sideInput,
+                OutputReceiver<KV<Integer, Integer>> o,
+                @Key Integer key) {
+              o.output(KV.of(key, sideInput));
+            }
+          };
+
+      final int numTestElements = 10;
+      final Instant now = new Instant(0);
+      TestStream.Builder<KV<Integer, Integer>> builder =
+          TestStream.create(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()))
+              .advanceWatermarkTo(new Instant(0));
+
+      for (int i = 0; i < numTestElements; i++) {
+        builder =
+            builder.addElements(
+                TimestampedValue.of(KV.of(i % 2, i), 
now.plus(Duration.millis(i * 1000))));
+        if ((i + 1) % 10 == 0) {
+          builder = builder.advanceWatermarkTo(now.plus(Duration.millis((i + 
1) * 1000)));
+        }
+      }
+      List<KV<Integer, Integer>> expected =
+          IntStream.rangeClosed(0, 1)
+              .boxed()
+              .flatMap(i -> ImmutableList.of(KV.of(i, 2), KV.of(i, 
2)).stream())
+              .collect(Collectors.toList());
+
+      PCollection<KV<Integer, Integer>> output =
+          pipeline
+              .apply(builder.advanceWatermarkToInfinity())
+              .apply(ParDo.of(doFn).withSideInput(sideInputTag1, sideInput));
+      PAssert.that(output).containsInAnyOrder(expected);
+      pipeline.run();
+    }
+
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesSideInputs.class,
+      UsesSideInputsInTimer.class,
+      UsesTimersInParDo.class,
+      UsesTriggeredSideInputs.class
+    })
+    public void testSideInputNotReadyTimer() {
+      final String sideInputTag = "tag1";
+
+      // Create a side input that is delayed by 5 seconds using Thread.sleep
+      DoFn<KV<String, String>, String> delayFn =
+          new DoFn<KV<String, String>, String>() {
+            @ProcessElement
+            public void process(OutputReceiver<String> o) throws 
InterruptedException {
+              Thread.sleep(java.time.Duration.ofSeconds(15).toMillis());
+              o.output("side-value");
+            }
+          };

Review Comment:
   TestStream doesn't work on the Dataflow v1 runner....



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to