autophagy commented on code in PR #28326:
URL: https://github.com/apache/flink/pull/28326#discussion_r3475684066


##########
flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarnessTest.java:
##########
@@ -2240,6 +2240,19 @@ void testWatermarkAdvancesWithoutTimers() throws 
Exception {
         }
     }
 
+    @Test
+    void testWatermarkAdvancesWithoutOnTimeColumn() throws Exception {
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(PassthroughPTF.class)
+                        .withTableArgument("input", DataTypes.of("ROW<value 
INT>"))
+                        .build()) {
+
+            harness.processElement(Row.of(42));
+            harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 10));

Review Comment:
   Yep, this is something that the PTF framework supports! There's even a test 
PTF for it in the live tests:
   ```
   public static class OptionalOnTimeFunction extends 
AppendProcessTableFunctionBase {
           public void eval(Context ctx, @ArgumentHint(SET_SEMANTIC_TABLE) Row 
r) {
               final TimeContext<Long> timeCtx = ctx.timeContext(Long.class);
               collectEvalEvent(timeCtx, r);
               Long wm = timeCtx.currentWatermark();
               // Register at wm+1 to always target the immediate next 
watermark: the timer fires
               // exactly once per watermark advance, and each new row 
re-registers the timer for the
               // following watermark step, demonstrating repeated timer 
re-registration.
               long timer = wm == null || wm < 0 ? 1 : wm + 1;
               collectCreateTimer(timeCtx, "t", timer);
           }
   
           public void onTimer(OnTimerContext ctx) {
               final TimeContext<Long> timeCtx = ctx.timeContext(Long.class);
               collectOnTimerEvent(ctx);
           }
       }
   ```
   
   The harness should already handle this correctly but I dont think we have an 
explicit test case for it - i'll add one!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to