autophagy commented on code in PR #28326:
URL: https://github.com/apache/flink/pull/28326#discussion_r3382256359


##########
docs/content.zh/docs/dev/table/functions/ptfs.md:
##########
@@ -2465,6 +2465,115 @@ void testStateMutation() throws Exception {
 {{< /tab >}}
 {{< /tabs >}}
 
+#### Testing with Timers and Context
+
+The harness supports the `Context` parameter, timer registration via 
`TimeContext`, and `onTimer`
+callbacks. Use `.withOnTimeColumn()` to configure the event time column and 
`.setWatermark()` to
+advance watermarks and fire eligible timers.
+
+{{< tabs "timer-testing" >}}
+{{< tab "Java" >}}
+```java
+// A PTF that registers a named timer 5 seconds after each event, and emits 
when it fires.
+@DataTypeHint("ROW<message STRING>")
+public class TimerPTF extends ProcessTableFunction<Row> {
+  public void eval(
+      Context ctx,
+      @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, 
ArgumentTrait.REQUIRE_ON_TIME})
+          Row input) {
+    String name = input.getFieldAs("name");
+    TimeContext<LocalDateTime> timeCtx = ctx.timeContext(LocalDateTime.class);
+    timeCtx.registerOnTime("timeout-" + name, 
timeCtx.time().plus(Duration.ofSeconds(5)));
+    collect(Row.of("registered-" + name));
+  }
+
+  public void onTimer(OnTimerContext ctx) {
+    collect(Row.of("timer-fired-" + ctx.currentTimer()));
+  }
+}
+
+@Test
+void testTimerRegistrationAndFiring() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+      ProcessTableFunctionTestHarness.ofClass(TimerPTF.class)
+          .withTableArgument("input",
+              DataTypes.of("ROW<partition STRING, name STRING, ts 
TIMESTAMP(3)>"))
+          .withPartitionBy("input", "partition")
+          .withOnTimeColumn("ts")
+          .build()) {
+
+    harness.processElement(Row.of("P1", "Alice", LocalDateTime.of(2025, 1, 1, 
0, 0, 1)));
+
+    // Verify the timer was registered
+    assertThat(harness.getPendingTimers()).hasSize(1);
+    
assertThat(harness.getPendingTimers().get(0).getName()).isEqualTo("timeout-Alice");
+
+    // Advance watermark past the timer's timestamp to fire it
+    harness.clearOutput();

Review Comment:
   It'll have 1 row in it (we collect one upon timer registration). But i'll 
assert that its there and then clear, just so its not confusing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to