fhueske commented on code in PR #28326:
URL: https://github.com/apache/flink/pull/28326#discussion_r3371943350


##########
docs/content.zh/docs/dev/table/functions/ptfs.md:
##########
@@ -2465,6 +2465,115 @@ void testStateMutation() throws Exception {
 {{< /tab >}}
 {{< /tabs >}}
 
+#### Testing with Timers and Context
+
+The harness supports the `Context` parameter, timer registration via 
`TimeContext`, and `onTimer`
+callbacks. Use `.withOnTimeColumn()` to configure the event time column and 
`.setWatermark()` to
+advance watermarks and fire eligible timers.
+
+{{< tabs "timer-testing" >}}
+{{< tab "Java" >}}
+```java
+// A PTF that registers a named timer 5 seconds after each event, and emits 
when it fires.
+@DataTypeHint("ROW<message STRING>")
+public class TimerPTF extends ProcessTableFunction<Row> {
+  public void eval(
+      Context ctx,
+      @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, 
ArgumentTrait.REQUIRE_ON_TIME})
+          Row input) {
+    String name = input.getFieldAs("name");
+    TimeContext<LocalDateTime> timeCtx = ctx.timeContext(LocalDateTime.class);
+    timeCtx.registerOnTime("timeout-" + name, 
timeCtx.time().plus(Duration.ofSeconds(5)));
+    collect(Row.of("registered-" + name));
+  }
+
+  public void onTimer(OnTimerContext ctx) {
+    collect(Row.of("timer-fired-" + ctx.currentTimer()));
+  }
+}
+
+@Test
+void testTimerRegistrationAndFiring() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+      ProcessTableFunctionTestHarness.ofClass(TimerPTF.class)
+          .withTableArgument("input",
+              DataTypes.of("ROW<partition STRING, name STRING, ts 
TIMESTAMP(3)>"))
+          .withPartitionBy("input", "partition")
+          .withOnTimeColumn("ts")
+          .build()) {
+
+    harness.processElement(Row.of("P1", "Alice", LocalDateTime.of(2025, 1, 1, 
0, 0, 1)));
+
+    // Verify the timer was registered
+    assertThat(harness.getPendingTimers()).hasSize(1);
+    
assertThat(harness.getPendingTimers().get(0).getName()).isEqualTo("timeout-Alice");
+
+    // Advance watermark past the timer's timestamp to fire it
+    harness.clearOutput();
+    harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 7));
+
+    assertThat(harness.getOutput())
+        .containsExactly(
+            Row.of("P1", "timer-fired-timeout-Alice", LocalDateTime.of(2025, 
1, 1, 0, 0, 6)));
+
+    assertThat(harness.getPendingTimers()).isEmpty();
+    assertThat(harness.getFiredTimers()).hasSize(1);
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+**Timers with State**: State persisted during `eval()` is accessible in 
`onTimer()`:
+
+{{< tabs "timer-state-testing" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<message STRING>")
+public class TimerWithStatePTF extends ProcessTableFunction<Row> {
+  public static class CounterState {
+    public long count = 0L;
+  }
+
+  public void eval(
+      Context ctx,
+      @StateHint CounterState state,
+      @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, 
ArgumentTrait.REQUIRE_ON_TIME})
+          Row input) {
+    state.count++;
+    TimeContext<LocalDateTime> timeCtx = ctx.timeContext(LocalDateTime.class);
+    timeCtx.registerOnTime("check", 
timeCtx.time().plus(Duration.ofSeconds(5)));
+  }
+
+  public void onTimer(OnTimerContext ctx, @StateHint CounterState state) {
+    collect(Row.of("count=" + state.count));
+  }
+}
+
+@Test
+void testTimerWithState() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+      ProcessTableFunctionTestHarness.ofClass(TimerWithStatePTF.class)
+          .withTableArgument("input",
+              DataTypes.of("ROW<partition STRING, ts TIMESTAMP(3)>"))
+          .withPartitionBy("input", "partition")
+          .withOnTimeColumn("ts")
+          .build()) {
+
+    harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 
1)));
+    harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 
1)));
+    harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 
1)));

Review Comment:
   ```suggestion
       harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 
2)));
   ```
   show that two timers can be fired from one wm advancement?



##########
docs/content.zh/docs/dev/table/functions/ptfs.md:
##########
@@ -2465,6 +2465,115 @@ void testStateMutation() throws Exception {
 {{< /tab >}}
 {{< /tabs >}}
 
+#### Testing with Timers and Context
+
+The harness supports the `Context` parameter, timer registration via 
`TimeContext`, and `onTimer`
+callbacks. Use `.withOnTimeColumn()` to configure the event time column and 
`.setWatermark()` to
+advance watermarks and fire eligible timers.
+
+{{< tabs "timer-testing" >}}
+{{< tab "Java" >}}
+```java
+// A PTF that registers a named timer 5 seconds after each event, and emits 
when it fires.
+@DataTypeHint("ROW<message STRING>")
+public class TimerPTF extends ProcessTableFunction<Row> {
+  public void eval(
+      Context ctx,
+      @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, 
ArgumentTrait.REQUIRE_ON_TIME})
+          Row input) {
+    String name = input.getFieldAs("name");
+    TimeContext<LocalDateTime> timeCtx = ctx.timeContext(LocalDateTime.class);
+    timeCtx.registerOnTime("timeout-" + name, 
timeCtx.time().plus(Duration.ofSeconds(5)));
+    collect(Row.of("registered-" + name));
+  }
+
+  public void onTimer(OnTimerContext ctx) {
+    collect(Row.of("timer-fired-" + ctx.currentTimer()));
+  }
+}
+
+@Test
+void testTimerRegistrationAndFiring() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+      ProcessTableFunctionTestHarness.ofClass(TimerPTF.class)
+          .withTableArgument("input",
+              DataTypes.of("ROW<partition STRING, name STRING, ts 
TIMESTAMP(3)>"))
+          .withPartitionBy("input", "partition")
+          .withOnTimeColumn("ts")
+          .build()) {
+
+    harness.processElement(Row.of("P1", "Alice", LocalDateTime.of(2025, 1, 1, 
0, 0, 1)));
+
+    // Verify the timer was registered
+    assertThat(harness.getPendingTimers()).hasSize(1);
+    
assertThat(harness.getPendingTimers().get(0).getName()).isEqualTo("timeout-Alice");
+
+    // Advance watermark past the timer's timestamp to fire it
+    harness.clearOutput();
+    harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 7));
+
+    assertThat(harness.getOutput())
+        .containsExactly(
+            Row.of("P1", "timer-fired-timeout-Alice", LocalDateTime.of(2025, 
1, 1, 0, 0, 6)));
+
+    assertThat(harness.getPendingTimers()).isEmpty();
+    assertThat(harness.getFiredTimers()).hasSize(1);
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+**Timers with State**: State persisted during `eval()` is accessible in 
`onTimer()`:
+
+{{< tabs "timer-state-testing" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<message STRING>")
+public class TimerWithStatePTF extends ProcessTableFunction<Row> {
+  public static class CounterState {
+    public long count = 0L;
+  }
+
+  public void eval(
+      Context ctx,
+      @StateHint CounterState state,
+      @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, 
ArgumentTrait.REQUIRE_ON_TIME})
+          Row input) {
+    state.count++;
+    TimeContext<LocalDateTime> timeCtx = ctx.timeContext(LocalDateTime.class);
+    timeCtx.registerOnTime("check", 
timeCtx.time().plus(Duration.ofSeconds(5)));
+  }
+
+  public void onTimer(OnTimerContext ctx, @StateHint CounterState state) {
+    collect(Row.of("count=" + state.count));
+  }
+}
+
+@Test
+void testTimerWithState() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+      ProcessTableFunctionTestHarness.ofClass(TimerWithStatePTF.class)
+          .withTableArgument("input",
+              DataTypes.of("ROW<partition STRING, ts TIMESTAMP(3)>"))
+          .withPartitionBy("input", "partition")
+          .withOnTimeColumn("ts")
+          .build()) {
+
+    harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 
1)));
+    harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 
1)));
+    harness.processElement(Row.of("P1", LocalDateTime.of(2025, 1, 1, 0, 0, 
1)));
+
+    harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 7));
+    assertThat(harness.getOutput())
+        .containsExactly(
+            Row.of("P1", "count=3", LocalDateTime.of(2025, 1, 1, 0, 0, 6)));

Review Comment:
   ```suggestion
       harness.setWatermark(LocalDateTime.of(2025, 1, 1, 0, 0, 8));
       assertThat(harness.getOutput())
           .containsExactly(
               Row.of("P1", "count=2", LocalDateTime.of(2025, 1, 1, 0, 0, 6)),
               Row.of("P1", "count=3", LocalDateTime.of(2025, 1, 1, 0, 0, 7)),);
   ```



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -303,38 +332,419 @@ public void clearStateForKey(String stateName, Row 
partitionKey) {
         stateManager.clearStateForKey(stateName, partitionKey);
     }
 
+    // 
-------------------------------------------------------------------------
+    // Watermark & Timer API
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Sets the watermark for all tables to the given {@link LocalDateTime} 
and fires eligible
+     * timers.
+     */
+    public void setWatermark(LocalDateTime watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /** Sets the watermark for all tables to the given {@link Instant} and 
fires eligible timers. */
+    public void setWatermark(Instant watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(watermark.toEpochMilli());
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link 
LocalDateTime} and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, LocalDateTime 
watermark)
+            throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, 
DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link Instant} 
and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, Instant watermark) 
throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, watermark.toEpochMilli());
+    }
+
+    /** Returns all timers (both pending and fired), sorted by timestamp then 
name. */
+    public List<Timer> getTimers() {
+        return Stream.concat(
+                        timerManager.getPendingTimers().stream(),
+                        timerManager.getFiredTimers().stream())
+                .sorted()
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all pending (not yet fired) timers, sorted by timestamp then 
name. */
+    public List<Timer> getPendingTimers() {
+        return timerManager.getPendingTimers();
+    }
+
+    /** Returns all pending timers with the given name. */
+    public List<Timer> getPendingTimers(String timerName) {
+        return timerManager.getPendingTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all timers that have fired, in the order they fired. */
+    public List<Timer> getFiredTimers() {
+        return timerManager.getFiredTimers();
+    }
+
+    /** Returns all fired timers with the given name. */
+    public List<Timer> getFiredTimers(String timerName) {
+        return timerManager.getFiredTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Clears the fired timer history. */
+    public void clearFiredTimers() {
+        timerManager.clearFiredTimers();
+    }
+
+    private void setWatermarkMillis(long millis) throws Exception {
+        checkState(isOpen, "Harness is not open");
+        for (TableArgumentInfo tableArg : 
ArgumentInfo.filterTableArguments(arguments)) {
+            timerManager.setTableWatermark(tableArg.name, millis);
+        }
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void setWatermarkForTableMillis(String tableArgument, long millis) 
throws Exception {
+        checkState(isOpen, "Harness is not open");
+        checkNotNull(tableArgument, "tableArgument must not be null");
+        checkArgument(
+                argumentsByName.get(tableArgument) instanceof 
TableArgumentInfo,
+                "Unknown or non-table argument: %s",
+                tableArgument);
+        timerManager.setTableWatermark(tableArgument, millis);
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void fireTimer(Timer timer) throws Exception {
+        if (onTimer == null) {
+            throw new IllegalStateException(
+                    "Timer fired but no onTimer() method is defined in "

Review Comment:
   nit
   ```suggestion
                       "Timer fired but no valid onTimer() method is defined in 
"
   ```



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/InvocationContext.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.types.Row;
+
+import javax.annotation.Nullable;
+
+/** Captures the per-invocation state for an eval() or onTimer() call in the 
test harness. */

Review Comment:
   expand the comment a bit and add that `partitionKey` is always set and for 
`eval()` `row` and `tableArgumentName` are set and for `onTimer()` 
`firingTimer`?



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -303,38 +332,419 @@ public void clearStateForKey(String stateName, Row 
partitionKey) {
         stateManager.clearStateForKey(stateName, partitionKey);
     }
 
+    // 
-------------------------------------------------------------------------
+    // Watermark & Timer API
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Sets the watermark for all tables to the given {@link LocalDateTime} 
and fires eligible
+     * timers.
+     */
+    public void setWatermark(LocalDateTime watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /** Sets the watermark for all tables to the given {@link Instant} and 
fires eligible timers. */
+    public void setWatermark(Instant watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(watermark.toEpochMilli());
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link 
LocalDateTime} and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, LocalDateTime 
watermark)
+            throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, 
DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link Instant} 
and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, Instant watermark) 
throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, watermark.toEpochMilli());
+    }
+
+    /** Returns all timers (both pending and fired), sorted by timestamp then 
name. */
+    public List<Timer> getTimers() {
+        return Stream.concat(
+                        timerManager.getPendingTimers().stream(),
+                        timerManager.getFiredTimers().stream())
+                .sorted()
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all pending (not yet fired) timers, sorted by timestamp then 
name. */
+    public List<Timer> getPendingTimers() {
+        return timerManager.getPendingTimers();
+    }
+
+    /** Returns all pending timers with the given name. */
+    public List<Timer> getPendingTimers(String timerName) {
+        return timerManager.getPendingTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all timers that have fired, in the order they fired. */
+    public List<Timer> getFiredTimers() {
+        return timerManager.getFiredTimers();
+    }
+
+    /** Returns all fired timers with the given name. */
+    public List<Timer> getFiredTimers(String timerName) {
+        return timerManager.getFiredTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Clears the fired timer history. */
+    public void clearFiredTimers() {
+        timerManager.clearFiredTimers();
+    }
+
+    private void setWatermarkMillis(long millis) throws Exception {
+        checkState(isOpen, "Harness is not open");
+        for (TableArgumentInfo tableArg : 
ArgumentInfo.filterTableArguments(arguments)) {
+            timerManager.setTableWatermark(tableArg.name, millis);
+        }
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void setWatermarkForTableMillis(String tableArgument, long millis) 
throws Exception {
+        checkState(isOpen, "Harness is not open");
+        checkNotNull(tableArgument, "tableArgument must not be null");
+        checkArgument(
+                argumentsByName.get(tableArgument) instanceof 
TableArgumentInfo,
+                "Unknown or non-table argument: %s",
+                tableArgument);
+        timerManager.setTableWatermark(tableArgument, millis);
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void fireTimer(Timer timer) throws Exception {
+        if (onTimer == null) {
+            throw new IllegalStateException(
+                    "Timer fired but no onTimer() method is defined in "
+                            + function.getClass().getSimpleName());
+        }
+
+        currentInvocation = InvocationContext.forTimer(timer);
+
+        try {
+            Map<String, Object> stateMap = 
stateManager.loadStateForKey(timer.partitionKey);
+
+            List<StateArgumentInfo> stateArgs = 
ArgumentInfo.filterStateArguments(arguments);
+            Object[] methodArgs = new Object[stateArgs.size()];
+            for (int i = 0; i < stateArgs.size(); i++) {
+                methodArgs[i] = stateMap.get(stateArgs.get(i).name);
+            }
+
+            onTimer.invoke(function, new TestOnTimerContext(stateMap), 
methodArgs);
+            stateManager.updateStateForKey(timer.partitionKey, stateMap);
+        } catch (InvocationTargetException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof Exception) {
+                throw (Exception) cause;
+            }
+            throw new RuntimeException("onTimer() invocation failed", e);
+        } finally {
+            currentInvocation = null;
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Context implementations
+    // 
-------------------------------------------------------------------------
+
+    private class TestContext implements ProcessTableFunction.Context {
+        final Map<String, Object> stateMap;
+
+        TestContext(Map<String, Object> stateMap) {
+            this.stateMap = stateMap;
+        }
+
+        @Override
+        public <TimeType> ProcessTableFunction.TimeContext<TimeType> 
timeContext(
+                Class<TimeType> conversionClass) {
+            return new TestTimeContext<>(conversionClass);
+        }
+
+        @Override
+        public TableSemantics tableSemanticsFor(String argName) {
+            ArgumentInfo argInfo = argumentsByName.get(argName);
+            if (argInfo == null) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Argument '%s' not found. Available arguments: 
%s",
+                                argName, argumentsByName.keySet()));
+            }
+            if (!(argInfo instanceof TableArgumentInfo)) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Argument '%s' is not a table argument (type: 
%s)",
+                                argName, argInfo.getClass().getSimpleName()));
+            }
+            TableArgumentInfo tableArg = (TableArgumentInfo) argInfo;
+            int[] partitionIndices = getPartitionColumnIndices(tableArg);
+            int timeColumn =

Review Comment:
   nit
   ```suggestion
               int timeColumnIndex =
   ```



##########
docs/content.zh/docs/dev/table/functions/ptfs.md:
##########
@@ -2465,6 +2465,115 @@ void testStateMutation() throws Exception {
 {{< /tab >}}
 {{< /tabs >}}
 
+#### Testing with Timers and Context
+
+The harness supports the `Context` parameter, timer registration via 
`TimeContext`, and `onTimer`
+callbacks. Use `.withOnTimeColumn()` to configure the event time column and 
`.setWatermark()` to
+advance watermarks and fire eligible timers.
+
+{{< tabs "timer-testing" >}}
+{{< tab "Java" >}}
+```java
+// A PTF that registers a named timer 5 seconds after each event, and emits 
when it fires.
+@DataTypeHint("ROW<message STRING>")
+public class TimerPTF extends ProcessTableFunction<Row> {
+  public void eval(
+      Context ctx,
+      @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, 
ArgumentTrait.REQUIRE_ON_TIME})
+          Row input) {
+    String name = input.getFieldAs("name");
+    TimeContext<LocalDateTime> timeCtx = ctx.timeContext(LocalDateTime.class);
+    timeCtx.registerOnTime("timeout-" + name, 
timeCtx.time().plus(Duration.ofSeconds(5)));
+    collect(Row.of("registered-" + name));
+  }
+
+  public void onTimer(OnTimerContext ctx) {
+    collect(Row.of("timer-fired-" + ctx.currentTimer()));
+  }
+}
+
+@Test
+void testTimerRegistrationAndFiring() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+      ProcessTableFunctionTestHarness.ofClass(TimerPTF.class)
+          .withTableArgument("input",
+              DataTypes.of("ROW<partition STRING, name STRING, ts 
TIMESTAMP(3)>"))
+          .withPartitionBy("input", "partition")
+          .withOnTimeColumn("ts")
+          .build()) {
+
+    harness.processElement(Row.of("P1", "Alice", LocalDateTime.of(2025, 1, 1, 
0, 0, 1)));
+
+    // Verify the timer was registered
+    assertThat(harness.getPendingTimers()).hasSize(1);
+    
assertThat(harness.getPendingTimers().get(0).getName()).isEqualTo("timeout-Alice");

Review Comment:
   is it possible to assert the timestamp of a timer as well?



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -303,38 +332,419 @@ public void clearStateForKey(String stateName, Row 
partitionKey) {
         stateManager.clearStateForKey(stateName, partitionKey);
     }
 
+    // 
-------------------------------------------------------------------------
+    // Watermark & Timer API
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Sets the watermark for all tables to the given {@link LocalDateTime} 
and fires eligible
+     * timers.
+     */
+    public void setWatermark(LocalDateTime watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /** Sets the watermark for all tables to the given {@link Instant} and 
fires eligible timers. */
+    public void setWatermark(Instant watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(watermark.toEpochMilli());
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link 
LocalDateTime} and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, LocalDateTime 
watermark)
+            throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, 
DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link Instant} 
and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, Instant watermark) 
throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, watermark.toEpochMilli());
+    }
+
+    /** Returns all timers (both pending and fired), sorted by timestamp then 
name. */
+    public List<Timer> getTimers() {
+        return Stream.concat(
+                        timerManager.getPendingTimers().stream(),
+                        timerManager.getFiredTimers().stream())
+                .sorted()
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all pending (not yet fired) timers, sorted by timestamp then 
name. */
+    public List<Timer> getPendingTimers() {
+        return timerManager.getPendingTimers();
+    }
+
+    /** Returns all pending timers with the given name. */
+    public List<Timer> getPendingTimers(String timerName) {
+        return timerManager.getPendingTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all timers that have fired, in the order they fired. */
+    public List<Timer> getFiredTimers() {
+        return timerManager.getFiredTimers();
+    }
+
+    /** Returns all fired timers with the given name. */
+    public List<Timer> getFiredTimers(String timerName) {
+        return timerManager.getFiredTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Clears the fired timer history. */
+    public void clearFiredTimers() {
+        timerManager.clearFiredTimers();
+    }
+
+    private void setWatermarkMillis(long millis) throws Exception {
+        checkState(isOpen, "Harness is not open");
+        for (TableArgumentInfo tableArg : 
ArgumentInfo.filterTableArguments(arguments)) {
+            timerManager.setTableWatermark(tableArg.name, millis);
+        }
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void setWatermarkForTableMillis(String tableArgument, long millis) 
throws Exception {
+        checkState(isOpen, "Harness is not open");
+        checkNotNull(tableArgument, "tableArgument must not be null");
+        checkArgument(
+                argumentsByName.get(tableArgument) instanceof 
TableArgumentInfo,
+                "Unknown or non-table argument: %s",
+                tableArgument);
+        timerManager.setTableWatermark(tableArgument, millis);
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void fireTimer(Timer timer) throws Exception {
+        if (onTimer == null) {
+            throw new IllegalStateException(
+                    "Timer fired but no onTimer() method is defined in "
+                            + function.getClass().getSimpleName());
+        }
+
+        currentInvocation = InvocationContext.forTimer(timer);
+
+        try {
+            Map<String, Object> stateMap = 
stateManager.loadStateForKey(timer.partitionKey);
+
+            List<StateArgumentInfo> stateArgs = 
ArgumentInfo.filterStateArguments(arguments);
+            Object[] methodArgs = new Object[stateArgs.size()];
+            for (int i = 0; i < stateArgs.size(); i++) {
+                methodArgs[i] = stateMap.get(stateArgs.get(i).name);
+            }
+
+            onTimer.invoke(function, new TestOnTimerContext(stateMap), 
methodArgs);
+            stateManager.updateStateForKey(timer.partitionKey, stateMap);
+        } catch (InvocationTargetException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof Exception) {
+                throw (Exception) cause;
+            }
+            throw new RuntimeException("onTimer() invocation failed", e);
+        } finally {
+            currentInvocation = null;
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Context implementations
+    // 
-------------------------------------------------------------------------
+
+    private class TestContext implements ProcessTableFunction.Context {
+        final Map<String, Object> stateMap;
+
+        TestContext(Map<String, Object> stateMap) {
+            this.stateMap = stateMap;
+        }
+
+        @Override
+        public <TimeType> ProcessTableFunction.TimeContext<TimeType> 
timeContext(
+                Class<TimeType> conversionClass) {
+            return new TestTimeContext<>(conversionClass);
+        }
+
+        @Override
+        public TableSemantics tableSemanticsFor(String argName) {
+            ArgumentInfo argInfo = argumentsByName.get(argName);
+            if (argInfo == null) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Argument '%s' not found. Available arguments: 
%s",
+                                argName, argumentsByName.keySet()));

Review Comment:
   Filter arguments already down to table args?



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -655,6 +1083,56 @@ public Builder<OUT> withPartitionBy(String argumentName, 
String... columnNames)
             return this;
         }
 
+        // 
---------------------------------------------------------------------
+        // Timer & Watermark Configuration
+        // 
---------------------------------------------------------------------
+
+        /**
+         * Configures the on-time column name for the function.
+         *
+         * @param columnName The column that carries event time
+         */
+        public Builder<OUT> withOnTimeColumn(String columnName) {
+            checkNotNull(columnName, "columnName must not be null");
+            this.onTimeColumnName = columnName;
+            return this;
+        }
+
+        // 
---------------------------------------------------------------------
+        // Watermark
+        // 
---------------------------------------------------------------------
+
+        /** Sets the initial watermark for all table arguments. */
+        public Builder<OUT> withInitialWatermark(LocalDateTime watermark) {
+            checkNotNull(watermark, "watermark must not be null");
+            this.initialWatermarkForAll = 
DateTimeUtils.toTimestampMillis(watermark);
+            return this;
+        }
+
+        /** Sets the initial watermark for all table arguments. */
+        public Builder<OUT> withInitialWatermark(Instant watermark) {
+            checkNotNull(watermark, "watermark must not be null");
+            this.initialWatermarkForAll = watermark.toEpochMilli();
+            return this;
+        }
+
+        /** Sets the initial watermark for a specific table argument. */
+        public Builder<OUT> withInitialWatermarkForTable(
+                String tableArgument, LocalDateTime watermark) {
+            checkNotNull(tableArgument, "tableArgument must not be null");
+            checkNotNull(watermark, "watermark must not be null");
+            initialWatermarks.put(tableArgument, 
DateTimeUtils.toTimestampMillis(watermark));
+            return this;
+        }
+
+        /** Sets the initial watermark for a specific table argument. */
+        public Builder<OUT> withInitialWatermarkForTable(String tableArgument, 
Instant watermark) {
+            checkNotNull(tableArgument, "tableArgument must not be null");
+            checkNotNull(watermark, "watermark must not be null");
+            initialWatermarks.put(tableArgument, watermark.toEpochMilli());
+            return this;
+        }

Review Comment:
   see above



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ResolvedMethod.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * A resolved PTF method (eval or onTimer) paired with whether its first 
parameter accepts a
+ * Context.
+ */
+class ResolvedMethod {
+    final Method method;
+    final boolean takesContext;
+
+    static ResolvedMethod of(Method method, Class<?> contextClass) {

Review Comment:
   ```suggestion
       static ResolvedMethod<T> of(Method method, Class<T> contextClass) {
   ```



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -823,49 +1351,78 @@ private Method findEvalMethod() throws 
NoSuchMethodException {
                         "Multiple eval() methods found in "
                                 + functionClass.getSimpleName()
                                 + ". ProcessTableFunction must have exactly 
one eval() method.");
-            } else {
-                return evalMethod;
             }
-        }
 
-        /**
-         * Validates that the eval() method doesn't use unsupported features. 
Temporary, until
-         * context is supported.
-         */
-        private void validateEvalMethodSupported(Method evalMethod, 
List<ArgumentInfo> arguments) {
-            Parameter[] parameters = evalMethod.getParameters();
+            return evalMethod;
+        }
 
-            for (int i = 0; i < parameters.length; i++) {
-                Parameter param = parameters[i];
-                Class<?> paramType = param.getType();
+        @Nullable
+        private static Method findOnTimerMethod(
+                Class<?> functionClass, List<ArgumentInfo> arguments) {
+            List<Method> candidates = 
ExtractionUtils.collectMethods(functionClass, "onTimer");
+            if (candidates.isEmpty()) {

Review Comment:
   shouldn't we also fail if we have more than one candidate?
   How would the framework decide which `onTimer` method to call?



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTimerManager.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Timer and watermark manager for {@link ProcessTableFunctionTestHarness}.
+ *
+ * <p>Handles timer registration, clearing, watermark tracking, and 
determining which timers are
+ * eligible to fire.
+ */
+@Internal
+class TestHarnessTimerManager {
+
+    private final Map<Row, Set<Timer>> pendingTimersByPartition = new 
HashMap<>();
+    private final List<Timer> firedTimers = new ArrayList<>();
+    private final Map<String, Long> watermarkByTable = new HashMap<>();
+    @Nullable private Long globalWatermark;
+
+    TestHarnessTimerManager() {}
+
+    // 
-------------------------------------------------------------------------
+    // Watermark
+    // 
-------------------------------------------------------------------------
+
+    void setTableWatermark(String tableName, long absoluteMillis) {
+        Long current = watermarkByTable.get(tableName);
+        if (current != null && absoluteMillis < current) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Cannot move watermark backward for table '%s': 
current=%d, new=%d",
+                            tableName, current, absoluteMillis));
+        }
+        watermarkByTable.put(tableName, absoluteMillis);
+    }
+
+    void updateGlobalWatermarkAndFireTimers(ThrowingConsumer<Timer, Exception> 
firer)
+            throws Exception {
+        if (watermarkByTable.isEmpty()) {
+            return;
+        }
+        long newGlobalWatermark = Collections.min(watermarkByTable.values());
+        if (globalWatermark != null && newGlobalWatermark < globalWatermark) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Cannot move global watermark backward: 
current=%d, new=%d",
+                            globalWatermark, newGlobalWatermark));
+        }
+        globalWatermark = newGlobalWatermark;
+        fireEligibleTimers(newGlobalWatermark, firer);
+    }
+
+    @Nullable
+    Long getGlobalWatermark() {
+        return globalWatermark;
+    }
+
+    @Nullable
+    Long getWatermarkForTable(String tableName) {
+        return watermarkByTable.get(tableName);
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Timer Registration
+    // 
-------------------------------------------------------------------------
+
+    void register(Row partitionKey, long timestampMillis, @Nullable String 
name) {
+        Set<Timer> timerSet =
+                pendingTimersByPartition.computeIfAbsent(partitionKey, k -> 
new HashSet<>());
+
+        if (name != null) {
+            timerSet.removeIf(t -> name.equals(t.name));
+        }

Review Comment:
   I think we should also remove unnamed timers. 
   There can only be one timer per timestamp and name (also if name = `null`).



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -711,25 +1197,67 @@ public ProcessTableFunctionTestHarness<OUT> build() 
throws Exception {
 
             // Extract table arguments for output type derivation
             // SystemTypeInference needs table semantics for pass-through 
column deduplication
-            List<TableArgumentInfo> tableArgs = 
ArgumentInfo.filterTableArguments(arguments);
+            List<TableArgumentInfo> tableArgInfos = 
ArgumentInfo.filterTableArguments(arguments);
 
             // Derive output schema using SystemTypeInference
             DataType derivedOutputType =
                     deriveOutputTypeFromSystemInference(
-                            function, dataTypeFactory, systemTypeInference, 
tableArgs);
+                            function,
+                            dataTypeFactory,
+                            systemTypeInference,
+                            arguments,
+                            tableArgInfos);
 
             // Create output converter for PTF emissions
             DataStructureConverter<Object, Object> harnessOutputConverter =
                     createPTFOutputConverter(derivedOutputType);
 
+            // Validate onTimeColumn configuration
+            if (onTimeColumnName != null) {
+                boolean foundInAnyTable =
+                        tableArgInfos.stream()
+                                .anyMatch(
+                                        t -> 
getFieldNames(t.dataType).contains(onTimeColumnName));

Review Comment:
   Do we also need to check if the tables with `on_time` column have set 
semantics (and partitioning)?



##########
docs/content.zh/docs/dev/table/functions/ptfs.md:
##########
@@ -2465,6 +2465,115 @@ void testStateMutation() throws Exception {
 {{< /tab >}}
 {{< /tabs >}}
 
+#### Testing with Timers and Context
+
+The harness supports the `Context` parameter, timer registration via 
`TimeContext`, and `onTimer`
+callbacks. Use `.withOnTimeColumn()` to configure the event time column and 
`.setWatermark()` to
+advance watermarks and fire eligible timers.
+
+{{< tabs "timer-testing" >}}
+{{< tab "Java" >}}
+```java
+// A PTF that registers a named timer 5 seconds after each event, and emits 
when it fires.
+@DataTypeHint("ROW<message STRING>")
+public class TimerPTF extends ProcessTableFunction<Row> {
+  public void eval(
+      Context ctx,
+      @ArgumentHint({ArgumentTrait.SET_SEMANTIC_TABLE, 
ArgumentTrait.REQUIRE_ON_TIME})
+          Row input) {
+    String name = input.getFieldAs("name");
+    TimeContext<LocalDateTime> timeCtx = ctx.timeContext(LocalDateTime.class);
+    timeCtx.registerOnTime("timeout-" + name, 
timeCtx.time().plus(Duration.ofSeconds(5)));
+    collect(Row.of("registered-" + name));
+  }
+
+  public void onTimer(OnTimerContext ctx) {
+    collect(Row.of("timer-fired-" + ctx.currentTimer()));
+  }
+}
+
+@Test
+void testTimerRegistrationAndFiring() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+      ProcessTableFunctionTestHarness.ofClass(TimerPTF.class)
+          .withTableArgument("input",
+              DataTypes.of("ROW<partition STRING, name STRING, ts 
TIMESTAMP(3)>"))
+          .withPartitionBy("input", "partition")
+          .withOnTimeColumn("ts")
+          .build()) {
+
+    harness.processElement(Row.of("P1", "Alice", LocalDateTime.of(2025, 1, 1, 
0, 0, 1)));
+
+    // Verify the timer was registered
+    assertThat(harness.getPendingTimers()).hasSize(1);
+    
assertThat(harness.getPendingTimers().get(0).getName()).isEqualTo("timeout-Alice");
+
+    // Advance watermark past the timer's timestamp to fire it
+    harness.clearOutput();

Review Comment:
   the output should still be empty, no?



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -303,38 +332,419 @@ public void clearStateForKey(String stateName, Row 
partitionKey) {
         stateManager.clearStateForKey(stateName, partitionKey);
     }
 
+    // 
-------------------------------------------------------------------------
+    // Watermark & Timer API
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Sets the watermark for all tables to the given {@link LocalDateTime} 
and fires eligible
+     * timers.
+     */
+    public void setWatermark(LocalDateTime watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /** Sets the watermark for all tables to the given {@link Instant} and 
fires eligible timers. */
+    public void setWatermark(Instant watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(watermark.toEpochMilli());
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link 
LocalDateTime} and fires eligible
+     * timers.

Review Comment:
   nit:
   I would rephrase the `fires eligible timers` part a bit because this only 
happens the WMs on the other tables have been advanced far enough as well.



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -303,38 +332,419 @@ public void clearStateForKey(String stateName, Row 
partitionKey) {
         stateManager.clearStateForKey(stateName, partitionKey);
     }
 
+    // 
-------------------------------------------------------------------------
+    // Watermark & Timer API
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Sets the watermark for all tables to the given {@link LocalDateTime} 
and fires eligible
+     * timers.
+     */
+    public void setWatermark(LocalDateTime watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /** Sets the watermark for all tables to the given {@link Instant} and 
fires eligible timers. */
+    public void setWatermark(Instant watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(watermark.toEpochMilli());
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link 
LocalDateTime} and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, LocalDateTime 
watermark)
+            throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, 
DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link Instant} 
and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, Instant watermark) 
throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, watermark.toEpochMilli());
+    }
+
+    /** Returns all timers (both pending and fired), sorted by timestamp then 
name. */
+    public List<Timer> getTimers() {
+        return Stream.concat(
+                        timerManager.getPendingTimers().stream(),
+                        timerManager.getFiredTimers().stream())
+                .sorted()
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all pending (not yet fired) timers, sorted by timestamp then 
name. */
+    public List<Timer> getPendingTimers() {
+        return timerManager.getPendingTimers();
+    }
+
+    /** Returns all pending timers with the given name. */
+    public List<Timer> getPendingTimers(String timerName) {
+        return timerManager.getPendingTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all timers that have fired, in the order they fired. */
+    public List<Timer> getFiredTimers() {
+        return timerManager.getFiredTimers();
+    }
+
+    /** Returns all fired timers with the given name. */
+    public List<Timer> getFiredTimers(String timerName) {

Review Comment:
   also add a method to get fired timers by partition key?



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -303,38 +332,419 @@ public void clearStateForKey(String stateName, Row 
partitionKey) {
         stateManager.clearStateForKey(stateName, partitionKey);
     }
 
+    // 
-------------------------------------------------------------------------
+    // Watermark & Timer API
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Sets the watermark for all tables to the given {@link LocalDateTime} 
and fires eligible
+     * timers.
+     */
+    public void setWatermark(LocalDateTime watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /** Sets the watermark for all tables to the given {@link Instant} and 
fires eligible timers. */
+    public void setWatermark(Instant watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(watermark.toEpochMilli());
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link 
LocalDateTime} and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, LocalDateTime 
watermark)
+            throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, 
DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link Instant} 
and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, Instant watermark) 
throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, watermark.toEpochMilli());
+    }
+
+    /** Returns all timers (both pending and fired), sorted by timestamp then 
name. */
+    public List<Timer> getTimers() {
+        return Stream.concat(
+                        timerManager.getPendingTimers().stream(),
+                        timerManager.getFiredTimers().stream())
+                .sorted()
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all pending (not yet fired) timers, sorted by timestamp then 
name. */
+    public List<Timer> getPendingTimers() {
+        return timerManager.getPendingTimers();
+    }
+
+    /** Returns all pending timers with the given name. */
+    public List<Timer> getPendingTimers(String timerName) {

Review Comment:
   Maybe also add a `getPendingTimers(Row partitionKey)` method?



##########
docs/content.zh/docs/dev/table/functions/ptfs.md:
##########
@@ -2582,8 +2691,5 @@ void testPOJO() throws Exception {
 
 ### PTF Features Unsupported by the TestHarness
 
-- `Context` parameter
-- Timers (`onTimer`)
-- `on_time` / `rowtime`

Review Comment:
   🙌 



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -303,38 +332,419 @@ public void clearStateForKey(String stateName, Row 
partitionKey) {
         stateManager.clearStateForKey(stateName, partitionKey);
     }
 
+    // 
-------------------------------------------------------------------------
+    // Watermark & Timer API
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Sets the watermark for all tables to the given {@link LocalDateTime} 
and fires eligible
+     * timers.
+     */
+    public void setWatermark(LocalDateTime watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /** Sets the watermark for all tables to the given {@link Instant} and 
fires eligible timers. */
+    public void setWatermark(Instant watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(watermark.toEpochMilli());
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link 
LocalDateTime} and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, LocalDateTime 
watermark)
+            throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, 
DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link Instant} 
and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, Instant watermark) 
throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, watermark.toEpochMilli());
+    }
+
+    /** Returns all timers (both pending and fired), sorted by timestamp then 
name. */
+    public List<Timer> getTimers() {

Review Comment:
   nit:
   ```suggestion
       public List<Timer> getAllTimers() {
   ```
   I think, returning fired timers might be unexpected. So using "all" might 
highlight this.



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -303,38 +332,419 @@ public void clearStateForKey(String stateName, Row 
partitionKey) {
         stateManager.clearStateForKey(stateName, partitionKey);
     }
 
+    // 
-------------------------------------------------------------------------
+    // Watermark & Timer API
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Sets the watermark for all tables to the given {@link LocalDateTime} 
and fires eligible
+     * timers.
+     */
+    public void setWatermark(LocalDateTime watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /** Sets the watermark for all tables to the given {@link Instant} and 
fires eligible timers. */
+    public void setWatermark(Instant watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(watermark.toEpochMilli());
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link 
LocalDateTime} and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, LocalDateTime 
watermark)
+            throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, 
DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link Instant} 
and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, Instant watermark) 
throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, watermark.toEpochMilli());
+    }
+
+    /** Returns all timers (both pending and fired), sorted by timestamp then 
name. */
+    public List<Timer> getTimers() {
+        return Stream.concat(
+                        timerManager.getPendingTimers().stream(),
+                        timerManager.getFiredTimers().stream())
+                .sorted()
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all pending (not yet fired) timers, sorted by timestamp then 
name. */
+    public List<Timer> getPendingTimers() {
+        return timerManager.getPendingTimers();
+    }
+
+    /** Returns all pending timers with the given name. */
+    public List<Timer> getPendingTimers(String timerName) {
+        return timerManager.getPendingTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all timers that have fired, in the order they fired. */
+    public List<Timer> getFiredTimers() {
+        return timerManager.getFiredTimers();
+    }
+
+    /** Returns all fired timers with the given name. */
+    public List<Timer> getFiredTimers(String timerName) {
+        return timerManager.getFiredTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Clears the fired timer history. */
+    public void clearFiredTimers() {
+        timerManager.clearFiredTimers();
+    }
+
+    private void setWatermarkMillis(long millis) throws Exception {
+        checkState(isOpen, "Harness is not open");
+        for (TableArgumentInfo tableArg : 
ArgumentInfo.filterTableArguments(arguments)) {
+            timerManager.setTableWatermark(tableArg.name, millis);
+        }
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void setWatermarkForTableMillis(String tableArgument, long millis) 
throws Exception {
+        checkState(isOpen, "Harness is not open");
+        checkNotNull(tableArgument, "tableArgument must not be null");
+        checkArgument(
+                argumentsByName.get(tableArgument) instanceof 
TableArgumentInfo,
+                "Unknown or non-table argument: %s",
+                tableArgument);
+        timerManager.setTableWatermark(tableArgument, millis);
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void fireTimer(Timer timer) throws Exception {
+        if (onTimer == null) {
+            throw new IllegalStateException(
+                    "Timer fired but no onTimer() method is defined in "
+                            + function.getClass().getSimpleName());
+        }
+
+        currentInvocation = InvocationContext.forTimer(timer);
+
+        try {
+            Map<String, Object> stateMap = 
stateManager.loadStateForKey(timer.partitionKey);
+
+            List<StateArgumentInfo> stateArgs = 
ArgumentInfo.filterStateArguments(arguments);
+            Object[] methodArgs = new Object[stateArgs.size()];
+            for (int i = 0; i < stateArgs.size(); i++) {
+                methodArgs[i] = stateMap.get(stateArgs.get(i).name);
+            }
+
+            onTimer.invoke(function, new TestOnTimerContext(stateMap), 
methodArgs);
+            stateManager.updateStateForKey(timer.partitionKey, stateMap);

Review Comment:
   move out of `try` block as well?



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -303,38 +332,419 @@ public void clearStateForKey(String stateName, Row 
partitionKey) {
         stateManager.clearStateForKey(stateName, partitionKey);
     }
 
+    // 
-------------------------------------------------------------------------
+    // Watermark & Timer API
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Sets the watermark for all tables to the given {@link LocalDateTime} 
and fires eligible
+     * timers.
+     */
+    public void setWatermark(LocalDateTime watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /** Sets the watermark for all tables to the given {@link Instant} and 
fires eligible timers. */
+    public void setWatermark(Instant watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(watermark.toEpochMilli());
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link 
LocalDateTime} and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, LocalDateTime 
watermark)
+            throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, 
DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link Instant} 
and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, Instant watermark) 
throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, watermark.toEpochMilli());
+    }
+
+    /** Returns all timers (both pending and fired), sorted by timestamp then 
name. */
+    public List<Timer> getTimers() {
+        return Stream.concat(
+                        timerManager.getPendingTimers().stream(),
+                        timerManager.getFiredTimers().stream())
+                .sorted()
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all pending (not yet fired) timers, sorted by timestamp then 
name. */
+    public List<Timer> getPendingTimers() {
+        return timerManager.getPendingTimers();
+    }
+
+    /** Returns all pending timers with the given name. */
+    public List<Timer> getPendingTimers(String timerName) {
+        return timerManager.getPendingTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all timers that have fired, in the order they fired. */
+    public List<Timer> getFiredTimers() {
+        return timerManager.getFiredTimers();
+    }
+
+    /** Returns all fired timers with the given name. */
+    public List<Timer> getFiredTimers(String timerName) {
+        return timerManager.getFiredTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Clears the fired timer history. */
+    public void clearFiredTimers() {
+        timerManager.clearFiredTimers();
+    }
+
+    private void setWatermarkMillis(long millis) throws Exception {
+        checkState(isOpen, "Harness is not open");
+        for (TableArgumentInfo tableArg : 
ArgumentInfo.filterTableArguments(arguments)) {
+            timerManager.setTableWatermark(tableArg.name, millis);
+        }
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void setWatermarkForTableMillis(String tableArgument, long millis) 
throws Exception {
+        checkState(isOpen, "Harness is not open");
+        checkNotNull(tableArgument, "tableArgument must not be null");
+        checkArgument(
+                argumentsByName.get(tableArgument) instanceof 
TableArgumentInfo,
+                "Unknown or non-table argument: %s",
+                tableArgument);
+        timerManager.setTableWatermark(tableArgument, millis);
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void fireTimer(Timer timer) throws Exception {
+        if (onTimer == null) {
+            throw new IllegalStateException(
+                    "Timer fired but no onTimer() method is defined in "
+                            + function.getClass().getSimpleName());
+        }
+
+        currentInvocation = InvocationContext.forTimer(timer);
+
+        try {
+            Map<String, Object> stateMap = 
stateManager.loadStateForKey(timer.partitionKey);
+
+            List<StateArgumentInfo> stateArgs = 
ArgumentInfo.filterStateArguments(arguments);
+            Object[] methodArgs = new Object[stateArgs.size()];
+            for (int i = 0; i < stateArgs.size(); i++) {
+                methodArgs[i] = stateMap.get(stateArgs.get(i).name);
+            }
+
+            onTimer.invoke(function, new TestOnTimerContext(stateMap), 
methodArgs);
+            stateManager.updateStateForKey(timer.partitionKey, stateMap);
+        } catch (InvocationTargetException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof Exception) {
+                throw (Exception) cause;
+            }
+            throw new RuntimeException("onTimer() invocation failed", e);
+        } finally {
+            currentInvocation = null;
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Context implementations
+    // 
-------------------------------------------------------------------------
+
+    private class TestContext implements ProcessTableFunction.Context {
+        final Map<String, Object> stateMap;
+
+        TestContext(Map<String, Object> stateMap) {
+            this.stateMap = stateMap;
+        }
+
+        @Override
+        public <TimeType> ProcessTableFunction.TimeContext<TimeType> 
timeContext(
+                Class<TimeType> conversionClass) {
+            return new TestTimeContext<>(conversionClass);
+        }
+
+        @Override
+        public TableSemantics tableSemanticsFor(String argName) {
+            ArgumentInfo argInfo = argumentsByName.get(argName);
+            if (argInfo == null) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Argument '%s' not found. Available arguments: 
%s",
+                                argName, argumentsByName.keySet()));
+            }
+            if (!(argInfo instanceof TableArgumentInfo)) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Argument '%s' is not a table argument (type: 
%s)",
+                                argName, argInfo.getClass().getSimpleName()));
+            }
+            TableArgumentInfo tableArg = (TableArgumentInfo) argInfo;
+            int[] partitionIndices = getPartitionColumnIndices(tableArg);
+            int timeColumn =
+                    onTimeColumnName != null
+                            ? 
getFieldNames(tableArg.dataType).indexOf(onTimeColumnName)
+                            : -1;
+            return new TestHarnessTableSemantics(tableArg.dataType, 
partitionIndices, timeColumn);
+        }
+
+        @Override
+        public void clearState(String stateName) {
+            stateMap.remove(stateName);
+        }
+
+        @Override
+        public void clearAllState() {
+            stateMap.clear();

Review Comment:
   Should be scoped to the partitionKey (like `clearAllTimers()`)?



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -303,38 +332,419 @@ public void clearStateForKey(String stateName, Row 
partitionKey) {
         stateManager.clearStateForKey(stateName, partitionKey);
     }
 
+    // 
-------------------------------------------------------------------------
+    // Watermark & Timer API
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Sets the watermark for all tables to the given {@link LocalDateTime} 
and fires eligible
+     * timers.
+     */
+    public void setWatermark(LocalDateTime watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /** Sets the watermark for all tables to the given {@link Instant} and 
fires eligible timers. */
+    public void setWatermark(Instant watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(watermark.toEpochMilli());
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link 
LocalDateTime} and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, LocalDateTime 
watermark)
+            throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, 
DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link Instant} 
and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, Instant watermark) 
throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, watermark.toEpochMilli());
+    }
+
+    /** Returns all timers (both pending and fired), sorted by timestamp then 
name. */
+    public List<Timer> getTimers() {
+        return Stream.concat(
+                        timerManager.getPendingTimers().stream(),
+                        timerManager.getFiredTimers().stream())
+                .sorted()
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all pending (not yet fired) timers, sorted by timestamp then 
name. */
+    public List<Timer> getPendingTimers() {
+        return timerManager.getPendingTimers();
+    }
+
+    /** Returns all pending timers with the given name. */
+    public List<Timer> getPendingTimers(String timerName) {
+        return timerManager.getPendingTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all timers that have fired, in the order they fired. */
+    public List<Timer> getFiredTimers() {
+        return timerManager.getFiredTimers();
+    }
+
+    /** Returns all fired timers with the given name. */
+    public List<Timer> getFiredTimers(String timerName) {
+        return timerManager.getFiredTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Clears the fired timer history. */
+    public void clearFiredTimers() {
+        timerManager.clearFiredTimers();
+    }
+
+    private void setWatermarkMillis(long millis) throws Exception {
+        checkState(isOpen, "Harness is not open");
+        for (TableArgumentInfo tableArg : 
ArgumentInfo.filterTableArguments(arguments)) {
+            timerManager.setTableWatermark(tableArg.name, millis);
+        }
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void setWatermarkForTableMillis(String tableArgument, long millis) 
throws Exception {
+        checkState(isOpen, "Harness is not open");
+        checkNotNull(tableArgument, "tableArgument must not be null");
+        checkArgument(
+                argumentsByName.get(tableArgument) instanceof 
TableArgumentInfo,
+                "Unknown or non-table argument: %s",
+                tableArgument);
+        timerManager.setTableWatermark(tableArgument, millis);
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void fireTimer(Timer timer) throws Exception {
+        if (onTimer == null) {
+            throw new IllegalStateException(
+                    "Timer fired but no onTimer() method is defined in "
+                            + function.getClass().getSimpleName());
+        }
+
+        currentInvocation = InvocationContext.forTimer(timer);
+
+        try {
+            Map<String, Object> stateMap = 
stateManager.loadStateForKey(timer.partitionKey);
+
+            List<StateArgumentInfo> stateArgs = 
ArgumentInfo.filterStateArguments(arguments);
+            Object[] methodArgs = new Object[stateArgs.size()];
+            for (int i = 0; i < stateArgs.size(); i++) {
+                methodArgs[i] = stateMap.get(stateArgs.get(i).name);
+            }
+
+            onTimer.invoke(function, new TestOnTimerContext(stateMap), 
methodArgs);
+            stateManager.updateStateForKey(timer.partitionKey, stateMap);
+        } catch (InvocationTargetException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof Exception) {
+                throw (Exception) cause;
+            }
+            throw new RuntimeException("onTimer() invocation failed", e);
+        } finally {
+            currentInvocation = null;
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Context implementations
+    // 
-------------------------------------------------------------------------
+
+    private class TestContext implements ProcessTableFunction.Context {
+        final Map<String, Object> stateMap;
+
+        TestContext(Map<String, Object> stateMap) {
+            this.stateMap = stateMap;
+        }
+
+        @Override
+        public <TimeType> ProcessTableFunction.TimeContext<TimeType> 
timeContext(
+                Class<TimeType> conversionClass) {
+            return new TestTimeContext<>(conversionClass);
+        }
+
+        @Override
+        public TableSemantics tableSemanticsFor(String argName) {
+            ArgumentInfo argInfo = argumentsByName.get(argName);
+            if (argInfo == null) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Argument '%s' not found. Available arguments: 
%s",
+                                argName, argumentsByName.keySet()));
+            }
+            if (!(argInfo instanceof TableArgumentInfo)) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Argument '%s' is not a table argument (type: 
%s)",
+                                argName, argInfo.getClass().getSimpleName()));
+            }
+            TableArgumentInfo tableArg = (TableArgumentInfo) argInfo;
+            int[] partitionIndices = getPartitionColumnIndices(tableArg);
+            int timeColumn =
+                    onTimeColumnName != null
+                            ? 
getFieldNames(tableArg.dataType).indexOf(onTimeColumnName)
+                            : -1;
+            return new TestHarnessTableSemantics(tableArg.dataType, 
partitionIndices, timeColumn);
+        }
+
+        @Override
+        public void clearState(String stateName) {
+            stateMap.remove(stateName);

Review Comment:
   I think this should be scoped to the partitionKey (like `clearAllTimers()`)



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -303,38 +332,419 @@ public void clearStateForKey(String stateName, Row 
partitionKey) {
         stateManager.clearStateForKey(stateName, partitionKey);
     }
 
+    // 
-------------------------------------------------------------------------
+    // Watermark & Timer API
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Sets the watermark for all tables to the given {@link LocalDateTime} 
and fires eligible
+     * timers.
+     */
+    public void setWatermark(LocalDateTime watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /** Sets the watermark for all tables to the given {@link Instant} and 
fires eligible timers. */
+    public void setWatermark(Instant watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(watermark.toEpochMilli());
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link 
LocalDateTime} and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, LocalDateTime 
watermark)
+            throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, 
DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link Instant} 
and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, Instant watermark) 
throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, watermark.toEpochMilli());
+    }
+
+    /** Returns all timers (both pending and fired), sorted by timestamp then 
name. */
+    public List<Timer> getTimers() {
+        return Stream.concat(
+                        timerManager.getPendingTimers().stream(),
+                        timerManager.getFiredTimers().stream())
+                .sorted()
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all pending (not yet fired) timers, sorted by timestamp then 
name. */
+    public List<Timer> getPendingTimers() {
+        return timerManager.getPendingTimers();
+    }
+
+    /** Returns all pending timers with the given name. */
+    public List<Timer> getPendingTimers(String timerName) {
+        return timerManager.getPendingTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all timers that have fired, in the order they fired. */
+    public List<Timer> getFiredTimers() {
+        return timerManager.getFiredTimers();
+    }
+
+    /** Returns all fired timers with the given name. */
+    public List<Timer> getFiredTimers(String timerName) {
+        return timerManager.getFiredTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Clears the fired timer history. */
+    public void clearFiredTimers() {
+        timerManager.clearFiredTimers();
+    }
+
+    private void setWatermarkMillis(long millis) throws Exception {
+        checkState(isOpen, "Harness is not open");
+        for (TableArgumentInfo tableArg : 
ArgumentInfo.filterTableArguments(arguments)) {
+            timerManager.setTableWatermark(tableArg.name, millis);
+        }
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void setWatermarkForTableMillis(String tableArgument, long millis) 
throws Exception {
+        checkState(isOpen, "Harness is not open");
+        checkNotNull(tableArgument, "tableArgument must not be null");
+        checkArgument(
+                argumentsByName.get(tableArgument) instanceof 
TableArgumentInfo,
+                "Unknown or non-table argument: %s",
+                tableArgument);
+        timerManager.setTableWatermark(tableArgument, millis);
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void fireTimer(Timer timer) throws Exception {
+        if (onTimer == null) {
+            throw new IllegalStateException(
+                    "Timer fired but no onTimer() method is defined in "
+                            + function.getClass().getSimpleName());
+        }
+
+        currentInvocation = InvocationContext.forTimer(timer);
+
+        try {
+            Map<String, Object> stateMap = 
stateManager.loadStateForKey(timer.partitionKey);
+
+            List<StateArgumentInfo> stateArgs = 
ArgumentInfo.filterStateArguments(arguments);
+            Object[] methodArgs = new Object[stateArgs.size()];
+            for (int i = 0; i < stateArgs.size(); i++) {
+                methodArgs[i] = stateMap.get(stateArgs.get(i).name);
+            }

Review Comment:
   move this out of the `try` block? It's not throwing anything that's being 
caught.



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -303,38 +332,419 @@ public void clearStateForKey(String stateName, Row 
partitionKey) {
         stateManager.clearStateForKey(stateName, partitionKey);
     }
 
+    // 
-------------------------------------------------------------------------
+    // Watermark & Timer API
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Sets the watermark for all tables to the given {@link LocalDateTime} 
and fires eligible
+     * timers.
+     */
+    public void setWatermark(LocalDateTime watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /** Sets the watermark for all tables to the given {@link Instant} and 
fires eligible timers. */
+    public void setWatermark(Instant watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(watermark.toEpochMilli());
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link 
LocalDateTime} and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, LocalDateTime 
watermark)
+            throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, 
DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link Instant} 
and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, Instant watermark) 
throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, watermark.toEpochMilli());
+    }
+
+    /** Returns all timers (both pending and fired), sorted by timestamp then 
name. */
+    public List<Timer> getTimers() {
+        return Stream.concat(
+                        timerManager.getPendingTimers().stream(),
+                        timerManager.getFiredTimers().stream())
+                .sorted()
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all pending (not yet fired) timers, sorted by timestamp then 
name. */
+    public List<Timer> getPendingTimers() {
+        return timerManager.getPendingTimers();
+    }
+
+    /** Returns all pending timers with the given name. */
+    public List<Timer> getPendingTimers(String timerName) {
+        return timerManager.getPendingTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all timers that have fired, in the order they fired. */
+    public List<Timer> getFiredTimers() {
+        return timerManager.getFiredTimers();
+    }
+
+    /** Returns all fired timers with the given name. */
+    public List<Timer> getFiredTimers(String timerName) {
+        return timerManager.getFiredTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Clears the fired timer history. */
+    public void clearFiredTimers() {
+        timerManager.clearFiredTimers();
+    }
+
+    private void setWatermarkMillis(long millis) throws Exception {
+        checkState(isOpen, "Harness is not open");
+        for (TableArgumentInfo tableArg : 
ArgumentInfo.filterTableArguments(arguments)) {
+            timerManager.setTableWatermark(tableArg.name, millis);
+        }
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void setWatermarkForTableMillis(String tableArgument, long millis) 
throws Exception {
+        checkState(isOpen, "Harness is not open");
+        checkNotNull(tableArgument, "tableArgument must not be null");
+        checkArgument(
+                argumentsByName.get(tableArgument) instanceof 
TableArgumentInfo,
+                "Unknown or non-table argument: %s",
+                tableArgument);
+        timerManager.setTableWatermark(tableArgument, millis);
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void fireTimer(Timer timer) throws Exception {
+        if (onTimer == null) {
+            throw new IllegalStateException(
+                    "Timer fired but no onTimer() method is defined in "
+                            + function.getClass().getSimpleName());
+        }
+
+        currentInvocation = InvocationContext.forTimer(timer);
+
+        try {
+            Map<String, Object> stateMap = 
stateManager.loadStateForKey(timer.partitionKey);
+
+            List<StateArgumentInfo> stateArgs = 
ArgumentInfo.filterStateArguments(arguments);
+            Object[] methodArgs = new Object[stateArgs.size()];
+            for (int i = 0; i < stateArgs.size(); i++) {
+                methodArgs[i] = stateMap.get(stateArgs.get(i).name);
+            }
+
+            onTimer.invoke(function, new TestOnTimerContext(stateMap), 
methodArgs);
+            stateManager.updateStateForKey(timer.partitionKey, stateMap);
+        } catch (InvocationTargetException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof Exception) {
+                throw (Exception) cause;
+            }
+            throw new RuntimeException("onTimer() invocation failed", e);
+        } finally {
+            currentInvocation = null;
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Context implementations
+    // 
-------------------------------------------------------------------------
+
+    private class TestContext implements ProcessTableFunction.Context {
+        final Map<String, Object> stateMap;
+
+        TestContext(Map<String, Object> stateMap) {
+            this.stateMap = stateMap;
+        }
+
+        @Override
+        public <TimeType> ProcessTableFunction.TimeContext<TimeType> 
timeContext(
+                Class<TimeType> conversionClass) {
+            return new TestTimeContext<>(conversionClass);
+        }
+
+        @Override
+        public TableSemantics tableSemanticsFor(String argName) {
+            ArgumentInfo argInfo = argumentsByName.get(argName);
+            if (argInfo == null) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Argument '%s' not found. Available arguments: 
%s",
+                                argName, argumentsByName.keySet()));
+            }
+            if (!(argInfo instanceof TableArgumentInfo)) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Argument '%s' is not a table argument (type: 
%s)",
+                                argName, argInfo.getClass().getSimpleName()));
+            }
+            TableArgumentInfo tableArg = (TableArgumentInfo) argInfo;
+            int[] partitionIndices = getPartitionColumnIndices(tableArg);
+            int timeColumn =
+                    onTimeColumnName != null
+                            ? 
getFieldNames(tableArg.dataType).indexOf(onTimeColumnName)
+                            : -1;
+            return new TestHarnessTableSemantics(tableArg.dataType, 
partitionIndices, timeColumn);
+        }
+
+        @Override
+        public void clearState(String stateName) {
+            stateMap.remove(stateName);
+        }
+
+        @Override
+        public void clearAllState() {
+            stateMap.clear();
+        }
+
+        @Override
+        public void clearAllTimers() {
+            timerManager.clearAll(currentInvocation.partitionKey);
+        }
+
+        @Override
+        public void clearAll() {
+            stateMap.clear();

Review Comment:
   Should be scoped to the partitionKey (like `clearAllTimers()`)?



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -543,6 +968,9 @@ public static class Builder<OUT> {
         private final Map<String, TableArgumentConfiguration> tableArgs = new 
HashMap<>();
         private final Map<String, PartitionConfiguration> partitionConfigs = 
new HashMap<>();
         private final Map<String, StateArgumentConfiguration> stateArgs = new 
HashMap<>();
+        @Nullable private String onTimeColumnName = null;
+        private final Map<String, Long> initialWatermarks = new HashMap<>();
+        @Nullable private Long initialWatermarkForAll = null;

Review Comment:
   Wondering if we really need these.
   Users could just call the `setWatermark()` methods right after creating / 
opening the harness.
   That would have the same effect and wouldn't really add complexity.
   
   So, IMO we could drop these from the builder. WDYT?



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -303,38 +332,419 @@ public void clearStateForKey(String stateName, Row 
partitionKey) {
         stateManager.clearStateForKey(stateName, partitionKey);
     }
 
+    // 
-------------------------------------------------------------------------
+    // Watermark & Timer API
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Sets the watermark for all tables to the given {@link LocalDateTime} 
and fires eligible
+     * timers.
+     */
+    public void setWatermark(LocalDateTime watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /** Sets the watermark for all tables to the given {@link Instant} and 
fires eligible timers. */
+    public void setWatermark(Instant watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(watermark.toEpochMilli());
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link 
LocalDateTime} and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, LocalDateTime 
watermark)
+            throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, 
DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link Instant} 
and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, Instant watermark) 
throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, watermark.toEpochMilli());
+    }
+
+    /** Returns all timers (both pending and fired), sorted by timestamp then 
name. */
+    public List<Timer> getTimers() {
+        return Stream.concat(
+                        timerManager.getPendingTimers().stream(),
+                        timerManager.getFiredTimers().stream())
+                .sorted()
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all pending (not yet fired) timers, sorted by timestamp then 
name. */
+    public List<Timer> getPendingTimers() {
+        return timerManager.getPendingTimers();
+    }
+
+    /** Returns all pending timers with the given name. */
+    public List<Timer> getPendingTimers(String timerName) {
+        return timerManager.getPendingTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all timers that have fired, in the order they fired. */
+    public List<Timer> getFiredTimers() {
+        return timerManager.getFiredTimers();
+    }
+
+    /** Returns all fired timers with the given name. */
+    public List<Timer> getFiredTimers(String timerName) {
+        return timerManager.getFiredTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Clears the fired timer history. */
+    public void clearFiredTimers() {
+        timerManager.clearFiredTimers();
+    }
+
+    private void setWatermarkMillis(long millis) throws Exception {
+        checkState(isOpen, "Harness is not open");
+        for (TableArgumentInfo tableArg : 
ArgumentInfo.filterTableArguments(arguments)) {
+            timerManager.setTableWatermark(tableArg.name, millis);
+        }
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void setWatermarkForTableMillis(String tableArgument, long millis) 
throws Exception {
+        checkState(isOpen, "Harness is not open");
+        checkNotNull(tableArgument, "tableArgument must not be null");
+        checkArgument(
+                argumentsByName.get(tableArgument) instanceof 
TableArgumentInfo,
+                "Unknown or non-table argument: %s",
+                tableArgument);
+        timerManager.setTableWatermark(tableArgument, millis);
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void fireTimer(Timer timer) throws Exception {
+        if (onTimer == null) {
+            throw new IllegalStateException(
+                    "Timer fired but no onTimer() method is defined in "
+                            + function.getClass().getSimpleName());
+        }
+
+        currentInvocation = InvocationContext.forTimer(timer);
+
+        try {
+            Map<String, Object> stateMap = 
stateManager.loadStateForKey(timer.partitionKey);
+
+            List<StateArgumentInfo> stateArgs = 
ArgumentInfo.filterStateArguments(arguments);
+            Object[] methodArgs = new Object[stateArgs.size()];
+            for (int i = 0; i < stateArgs.size(); i++) {
+                methodArgs[i] = stateMap.get(stateArgs.get(i).name);
+            }
+
+            onTimer.invoke(function, new TestOnTimerContext(stateMap), 
methodArgs);
+            stateManager.updateStateForKey(timer.partitionKey, stateMap);
+        } catch (InvocationTargetException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof Exception) {
+                throw (Exception) cause;
+            }
+            throw new RuntimeException("onTimer() invocation failed", e);
+        } finally {
+            currentInvocation = null;
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Context implementations
+    // 
-------------------------------------------------------------------------
+
+    private class TestContext implements ProcessTableFunction.Context {
+        final Map<String, Object> stateMap;
+
+        TestContext(Map<String, Object> stateMap) {
+            this.stateMap = stateMap;
+        }
+
+        @Override
+        public <TimeType> ProcessTableFunction.TimeContext<TimeType> 
timeContext(
+                Class<TimeType> conversionClass) {
+            return new TestTimeContext<>(conversionClass);
+        }
+
+        @Override
+        public TableSemantics tableSemanticsFor(String argName) {
+            ArgumentInfo argInfo = argumentsByName.get(argName);
+            if (argInfo == null) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Argument '%s' not found. Available arguments: 
%s",
+                                argName, argumentsByName.keySet()));
+            }
+            if (!(argInfo instanceof TableArgumentInfo)) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Argument '%s' is not a table argument (type: 
%s)",
+                                argName, argInfo.getClass().getSimpleName()));
+            }
+            TableArgumentInfo tableArg = (TableArgumentInfo) argInfo;
+            int[] partitionIndices = getPartitionColumnIndices(tableArg);
+            int timeColumn =
+                    onTimeColumnName != null
+                            ? 
getFieldNames(tableArg.dataType).indexOf(onTimeColumnName)
+                            : -1;
+            return new TestHarnessTableSemantics(tableArg.dataType, 
partitionIndices, timeColumn);
+        }
+
+        @Override
+        public void clearState(String stateName) {
+            stateMap.remove(stateName);
+        }
+
+        @Override
+        public void clearAllState() {
+            stateMap.clear();
+        }
+
+        @Override
+        public void clearAllTimers() {
+            timerManager.clearAll(currentInvocation.partitionKey);
+        }
+
+        @Override
+        public void clearAll() {
+            stateMap.clear();
+            timerManager.clearAll(currentInvocation.partitionKey);
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode() {
+            return ChangelogMode.insertOnly();
+        }
+    }
+
+    private class TestTimeContext<TimeType> implements 
ProcessTableFunction.TimeContext<TimeType> {
+        private final Class<TimeType> conversionClass;
+
+        TestTimeContext(Class<TimeType> conversionClass) {
+            this.conversionClass = conversionClass;
+        }
+
+        @Override
+        public TimeType time() {
+            InvocationContext ctx = currentInvocation;
+            if (ctx.isTimerInvocation()) {
+                return fromMillis(ctx.firingTimer.timestamp);
+            }
+            if (ctx.isEvalInvocation() && onTimeColumnName != null) {
+                ArgumentInfo argInfo = 
argumentsByName.get(ctx.tableArgumentName);
+                if (argInfo instanceof TableArgumentInfo) {
+                    TableArgumentInfo tableArg = (TableArgumentInfo) argInfo;
+                    if 
(!getFieldNames(tableArg.dataType).contains(onTimeColumnName)) {
+                        return null;
+                    }
+                }
+                Object timeValue = ctx.row.getField(onTimeColumnName);
+                if (timeValue == null) {
+                    return null;
+                }
+                return fromMillis(toMillis(timeValue));
+            }
+            return null;
+        }
+
+        @Override
+        public TimeType tableWatermark() {
+            InvocationContext ctx = currentInvocation;
+            if (!ctx.isEvalInvocation()) {
+                return null;
+            }
+            Long wm = timerManager.getWatermarkForTable(ctx.tableArgumentName);
+            return wm != null ? fromMillis(wm) : null;
+        }
+
+        @Override
+        public TimeType currentWatermark() {
+            Long wm = timerManager.getGlobalWatermark();
+            return wm != null ? fromMillis(wm) : null;
+        }
+
+        @Override
+        public void registerOnTime(String name, TimeType time) {
+            checkTimersEnabled();
+            checkNotNull(name, "Timer name must not be null");
+            checkNotNull(time, "Timer timestamp must not be null");
+            timerManager.register(currentInvocation.partitionKey, 
toMillis(time), name);
+        }
+
+        @Override
+        public void registerOnTime(TimeType time) {
+            checkTimersEnabled();
+            checkNotNull(time, "Timer timestamp must not be null");
+            timerManager.register(currentInvocation.partitionKey, 
toMillis(time), null);
+        }
+
+        @Override
+        public void clearTimer(String name) {
+            checkNotNull(name, "Timer name must not be null");
+            timerManager.clearByName(currentInvocation.partitionKey, name);
+        }
+
+        @Override
+        public void clearTimer(TimeType time) {
+            checkNotNull(time, "Timer timestamp must not be null");
+            timerManager.clearByTimestamp(currentInvocation.partitionKey, 
toMillis(time));
+        }
+
+        @Override
+        public void clearAllTimers() {
+            timerManager.clearAll(currentInvocation.partitionKey);
+        }
+
+        private void checkTimersEnabled() {
+            boolean enabled =
+                    ArgumentInfo.filterTableArguments(arguments).stream()
+                            .anyMatch(
+                                    t ->
+                                            t.isSetSemantic
+                                                    && t.prependStrategy
+                                                            != 
OutputPrependStrategy.ALL_COLUMNS);
+            if (!enabled) {
+                throw new TableRuntimeException(
+                        "Timers are not supported in the current PTF 
declaration. "
+                                + "Note that only PTFs that take set semantic 
tables support timers. "
+                                + "Also timers are not available for advanced 
traits such as "
+                                + "supporting pass-through columns or 
updates.");
+            }
+        }
+
+        private TimeType fromMillis(long millis) {
+            return convertFromMillis(millis, conversionClass);
+        }
+
+        private long toMillis(Object time) {
+            if (time instanceof Long) {
+                return (Long) time;
+            } else if (time instanceof Instant) {
+                return ((Instant) time).toEpochMilli();
+            } else if (time instanceof LocalDateTime) {
+                return DateTimeUtils.toTimestampMillis((LocalDateTime) time);
+            } else if (time instanceof java.sql.Timestamp) {
+                return ((java.sql.Timestamp) time).getTime();
+            }
+            throw new IllegalArgumentException(
+                    "Unsupported time type: " + 
time.getClass().getSimpleName());
+        }
+    }
+
+    private class TestOnTimerContext extends TestContext
+            implements ProcessTableFunction.OnTimerContext {
+        TestOnTimerContext(Map<String, Object> stateMap) {

Review Comment:
   nit
   
   ```suggestion
   
           TestOnTimerContext(Map<String, Object> stateMap) {
   ```



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ResolvedMethod.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * A resolved PTF method (eval or onTimer) paired with whether its first 
parameter accepts a
+ * Context.
+ */
+class ResolvedMethod {

Review Comment:
   I think this class could be typed on the contextClass.
   
   ```suggestion
   class ResolvedMethod<T extends ProcessTableFunction.Context> {
   ```



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -303,38 +332,419 @@ public void clearStateForKey(String stateName, Row 
partitionKey) {
         stateManager.clearStateForKey(stateName, partitionKey);
     }
 
+    // 
-------------------------------------------------------------------------
+    // Watermark & Timer API
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Sets the watermark for all tables to the given {@link LocalDateTime} 
and fires eligible
+     * timers.
+     */
+    public void setWatermark(LocalDateTime watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /** Sets the watermark for all tables to the given {@link Instant} and 
fires eligible timers. */
+    public void setWatermark(Instant watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(watermark.toEpochMilli());
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link 
LocalDateTime} and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, LocalDateTime 
watermark)
+            throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, 
DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link Instant} 
and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, Instant watermark) 
throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, watermark.toEpochMilli());
+    }
+
+    /** Returns all timers (both pending and fired), sorted by timestamp then 
name. */
+    public List<Timer> getTimers() {
+        return Stream.concat(
+                        timerManager.getPendingTimers().stream(),
+                        timerManager.getFiredTimers().stream())
+                .sorted()
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all pending (not yet fired) timers, sorted by timestamp then 
name. */
+    public List<Timer> getPendingTimers() {
+        return timerManager.getPendingTimers();
+    }
+
+    /** Returns all pending timers with the given name. */
+    public List<Timer> getPendingTimers(String timerName) {
+        return timerManager.getPendingTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Returns all timers that have fired, in the order they fired. */
+    public List<Timer> getFiredTimers() {
+        return timerManager.getFiredTimers();
+    }
+
+    /** Returns all fired timers with the given name. */
+    public List<Timer> getFiredTimers(String timerName) {
+        return timerManager.getFiredTimers().stream()
+                .filter(t -> timerName.equals(t.getName()))
+                .collect(Collectors.toList());
+    }
+
+    /** Clears the fired timer history. */
+    public void clearFiredTimers() {
+        timerManager.clearFiredTimers();
+    }
+
+    private void setWatermarkMillis(long millis) throws Exception {
+        checkState(isOpen, "Harness is not open");
+        for (TableArgumentInfo tableArg : 
ArgumentInfo.filterTableArguments(arguments)) {
+            timerManager.setTableWatermark(tableArg.name, millis);
+        }
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void setWatermarkForTableMillis(String tableArgument, long millis) 
throws Exception {
+        checkState(isOpen, "Harness is not open");
+        checkNotNull(tableArgument, "tableArgument must not be null");
+        checkArgument(
+                argumentsByName.get(tableArgument) instanceof 
TableArgumentInfo,
+                "Unknown or non-table argument: %s",
+                tableArgument);
+        timerManager.setTableWatermark(tableArgument, millis);
+        timerManager.updateGlobalWatermarkAndFireTimers(this::fireTimer);
+    }
+
+    private void fireTimer(Timer timer) throws Exception {
+        if (onTimer == null) {
+            throw new IllegalStateException(
+                    "Timer fired but no onTimer() method is defined in "
+                            + function.getClass().getSimpleName());
+        }
+
+        currentInvocation = InvocationContext.forTimer(timer);
+
+        try {
+            Map<String, Object> stateMap = 
stateManager.loadStateForKey(timer.partitionKey);
+
+            List<StateArgumentInfo> stateArgs = 
ArgumentInfo.filterStateArguments(arguments);
+            Object[] methodArgs = new Object[stateArgs.size()];
+            for (int i = 0; i < stateArgs.size(); i++) {
+                methodArgs[i] = stateMap.get(stateArgs.get(i).name);
+            }
+
+            onTimer.invoke(function, new TestOnTimerContext(stateMap), 
methodArgs);
+            stateManager.updateStateForKey(timer.partitionKey, stateMap);
+        } catch (InvocationTargetException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof Exception) {
+                throw (Exception) cause;
+            }
+            throw new RuntimeException("onTimer() invocation failed", e);
+        } finally {
+            currentInvocation = null;
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Context implementations
+    // 
-------------------------------------------------------------------------
+
+    private class TestContext implements ProcessTableFunction.Context {
+        final Map<String, Object> stateMap;
+
+        TestContext(Map<String, Object> stateMap) {
+            this.stateMap = stateMap;
+        }
+
+        @Override
+        public <TimeType> ProcessTableFunction.TimeContext<TimeType> 
timeContext(
+                Class<TimeType> conversionClass) {
+            return new TestTimeContext<>(conversionClass);
+        }
+
+        @Override
+        public TableSemantics tableSemanticsFor(String argName) {
+            ArgumentInfo argInfo = argumentsByName.get(argName);
+            if (argInfo == null) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Argument '%s' not found. Available arguments: 
%s",
+                                argName, argumentsByName.keySet()));
+            }
+            if (!(argInfo instanceof TableArgumentInfo)) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Argument '%s' is not a table argument (type: 
%s)",
+                                argName, argInfo.getClass().getSimpleName()));
+            }
+            TableArgumentInfo tableArg = (TableArgumentInfo) argInfo;
+            int[] partitionIndices = getPartitionColumnIndices(tableArg);
+            int timeColumn =
+                    onTimeColumnName != null
+                            ? 
getFieldNames(tableArg.dataType).indexOf(onTimeColumnName)
+                            : -1;
+            return new TestHarnessTableSemantics(tableArg.dataType, 
partitionIndices, timeColumn);
+        }
+
+        @Override
+        public void clearState(String stateName) {
+            stateMap.remove(stateName);
+        }
+
+        @Override
+        public void clearAllState() {
+            stateMap.clear();
+        }
+
+        @Override
+        public void clearAllTimers() {
+            timerManager.clearAll(currentInvocation.partitionKey);
+        }
+
+        @Override
+        public void clearAll() {
+            stateMap.clear();
+            timerManager.clearAll(currentInvocation.partitionKey);
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode() {
+            return ChangelogMode.insertOnly();
+        }
+    }
+
+    private class TestTimeContext<TimeType> implements 
ProcessTableFunction.TimeContext<TimeType> {
+        private final Class<TimeType> conversionClass;
+
+        TestTimeContext(Class<TimeType> conversionClass) {
+            this.conversionClass = conversionClass;
+        }
+
+        @Override
+        public TimeType time() {
+            InvocationContext ctx = currentInvocation;
+            if (ctx.isTimerInvocation()) {
+                return fromMillis(ctx.firingTimer.timestamp);
+            }
+            if (ctx.isEvalInvocation() && onTimeColumnName != null) {
+                ArgumentInfo argInfo = 
argumentsByName.get(ctx.tableArgumentName);
+                if (argInfo instanceof TableArgumentInfo) {
+                    TableArgumentInfo tableArg = (TableArgumentInfo) argInfo;
+                    if 
(!getFieldNames(tableArg.dataType).contains(onTimeColumnName)) {
+                        return null;
+                    }
+                }
+                Object timeValue = ctx.row.getField(onTimeColumnName);
+                if (timeValue == null) {
+                    return null;
+                }
+                return fromMillis(toMillis(timeValue));
+            }
+            return null;
+        }
+
+        @Override
+        public TimeType tableWatermark() {
+            InvocationContext ctx = currentInvocation;
+            if (!ctx.isEvalInvocation()) {
+                return null;
+            }
+            Long wm = timerManager.getWatermarkForTable(ctx.tableArgumentName);
+            return wm != null ? fromMillis(wm) : null;
+        }
+
+        @Override
+        public TimeType currentWatermark() {
+            Long wm = timerManager.getGlobalWatermark();
+            return wm != null ? fromMillis(wm) : null;
+        }
+
+        @Override
+        public void registerOnTime(String name, TimeType time) {
+            checkTimersEnabled();
+            checkNotNull(name, "Timer name must not be null");
+            checkNotNull(time, "Timer timestamp must not be null");
+            timerManager.register(currentInvocation.partitionKey, 
toMillis(time), name);
+        }
+
+        @Override
+        public void registerOnTime(TimeType time) {
+            checkTimersEnabled();
+            checkNotNull(time, "Timer timestamp must not be null");
+            timerManager.register(currentInvocation.partitionKey, 
toMillis(time), null);
+        }
+
+        @Override
+        public void clearTimer(String name) {
+            checkNotNull(name, "Timer name must not be null");
+            timerManager.clearByName(currentInvocation.partitionKey, name);
+        }
+
+        @Override
+        public void clearTimer(TimeType time) {
+            checkNotNull(time, "Timer timestamp must not be null");
+            timerManager.clearByTimestamp(currentInvocation.partitionKey, 
toMillis(time));
+        }
+
+        @Override
+        public void clearAllTimers() {
+            timerManager.clearAll(currentInvocation.partitionKey);
+        }
+
+        private void checkTimersEnabled() {
+            boolean enabled =
+                    ArgumentInfo.filterTableArguments(arguments).stream()
+                            .anyMatch(
+                                    t ->
+                                            t.isSetSemantic
+                                                    && t.prependStrategy
+                                                            != 
OutputPrependStrategy.ALL_COLUMNS);
+            if (!enabled) {
+                throw new TableRuntimeException(
+                        "Timers are not supported in the current PTF 
declaration. "
+                                + "Note that only PTFs that take set semantic 
tables support timers. "
+                                + "Also timers are not available for advanced 
traits such as "
+                                + "supporting pass-through columns or 
updates.");
+            }
+        }
+
+        private TimeType fromMillis(long millis) {
+            return convertFromMillis(millis, conversionClass);
+        }
+
+        private long toMillis(Object time) {
+            if (time instanceof Long) {
+                return (Long) time;
+            } else if (time instanceof Instant) {
+                return ((Instant) time).toEpochMilli();
+            } else if (time instanceof LocalDateTime) {
+                return DateTimeUtils.toTimestampMillis((LocalDateTime) time);
+            } else if (time instanceof java.sql.Timestamp) {
+                return ((java.sql.Timestamp) time).getTime();
+            }
+            throw new IllegalArgumentException(
+                    "Unsupported time type: " + 
time.getClass().getSimpleName());
+        }
+    }
+
+    private class TestOnTimerContext extends TestContext
+            implements ProcessTableFunction.OnTimerContext {
+        TestOnTimerContext(Map<String, Object> stateMap) {
+            super(stateMap);
+        }
+
+        @Override
+        public String currentTimer() {
+            if (currentInvocation.isTimerInvocation()) {
+                return currentInvocation.firingTimer.getName();
+            }
+            return null;
+        }
+    }
+
+    private static int[] getPartitionColumnIndices(TableArgumentInfo arg) {
+        if (arg.partitionColumnNames == null || 
arg.partitionColumnNames.length == 0) {
+            return new int[0];
+        }
+        List<String> fieldNames = getFieldNames(arg.dataType);
+        int[] indices = new int[arg.partitionColumnNames.length];
+        for (int i = 0; i < arg.partitionColumnNames.length; i++) {
+            String colName = arg.partitionColumnNames[i];
+            int index = fieldNames.indexOf(colName);
+            if (index < 0) {
+                throw new IllegalStateException(
+                        "Partition column '"
+                                + colName
+                                + "' not found in table argument. "
+                                + "Available fields: "
+                                + fieldNames);
+            }
+            indices[i] = index;
+        }
+        return indices;
+    }
+
+    @Nullable
+    private Class<?> resolveRowtimeConversionClass(List<TableArgumentInfo> 
tableArguments) {

Review Comment:
   Is it guaranteed (or required) that all table args need to have the same 
type / class for the `on_time` attribute? Or could table A use `Long` and table 
B use `Instant`?



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ResolvedMethod.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import javax.annotation.Nullable;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * A resolved PTF method (eval or onTimer) paired with whether its first 
parameter accepts a
+ * Context.
+ */
+class ResolvedMethod {
+    final Method method;
+    final boolean takesContext;
+
+    static ResolvedMethod of(Method method, Class<?> contextClass) {
+        boolean takesContext =
+                method.getParameterTypes().length > 0
+                        && 
contextClass.isAssignableFrom(method.getParameterTypes()[0]);
+        return new ResolvedMethod(method, takesContext);
+    }
+
+    private ResolvedMethod(Method method, boolean takesContext) {
+        this.method = method;
+        this.takesContext = takesContext;
+    }
+
+    void invoke(Object target, @Nullable Object context, Object[] methodArgs)

Review Comment:
   ```suggestion
       void invoke(Object target, @Nullable T context, Object[] methodArgs)
   ```



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -711,25 +1197,67 @@ public ProcessTableFunctionTestHarness<OUT> build() 
throws Exception {
 
             // Extract table arguments for output type derivation
             // SystemTypeInference needs table semantics for pass-through 
column deduplication
-            List<TableArgumentInfo> tableArgs = 
ArgumentInfo.filterTableArguments(arguments);
+            List<TableArgumentInfo> tableArgInfos = 
ArgumentInfo.filterTableArguments(arguments);
 
             // Derive output schema using SystemTypeInference
             DataType derivedOutputType =
                     deriveOutputTypeFromSystemInference(
-                            function, dataTypeFactory, systemTypeInference, 
tableArgs);
+                            function,
+                            dataTypeFactory,
+                            systemTypeInference,
+                            arguments,
+                            tableArgInfos);
 
             // Create output converter for PTF emissions
             DataStructureConverter<Object, Object> harnessOutputConverter =
                     createPTFOutputConverter(derivedOutputType);
 
+            // Validate onTimeColumn configuration
+            if (onTimeColumnName != null) {
+                boolean foundInAnyTable =
+                        tableArgInfos.stream()
+                                .anyMatch(
+                                        t -> 
getFieldNames(t.dataType).contains(onTimeColumnName));

Review Comment:
   should we check that the field is a valid timestamp type and also that the 
field has the same type in all table args that have this field?



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -1386,25 +1948,76 @@ private ProcessTableFunction<OUT> instantiateFunction() 
throws IllegalArgumentEx
 
         /**
          * Derives the output schema using SystemTypeInference, including 
field name deduplication.
+         *
+         * <p>SystemTypeInference's staticArgs list includes both 
user-declared arguments and
+         * system-injected arguments (on_time, uid). The CallContext we build 
must mirror this full
+         * list positionally — each index maps to a staticArg. User args get 
their resolved
+         * DataType; system args get placeholder types (DESCRIPTOR for 
on_time, STRING for uid).
+         * Table semantics are attached at the positions of table arguments so 
SystemTypeInference
+         * can perform pass-through column deduplication.
          */
         private DataType deriveOutputTypeFromSystemInference(
                 ProcessTableFunction<OUT> function,
                 DataTypeFactory dataTypeFactory,
                 TypeInference systemTypeInference,
-                List<TableArgumentInfo> arguments) {
+                List<ArgumentInfo> allArguments,
+                List<TableArgumentInfo> tableArguments) {
 
-            List<DataType> argumentDataTypes = new ArrayList<>();
-            for (TableArgumentInfo arg : arguments) {
-                argumentDataTypes.add(arg.dataType);
+            Optional<List<StaticArgument>> staticArgsOpt = 
systemTypeInference.getStaticArguments();
+            List<StaticArgument> staticArgs =
+                    staticArgsOpt.orElseThrow(
+                            () ->
+                                    new IllegalStateException(
+                                            "SystemTypeInference has no static 
arguments"));

Review Comment:
   nit
   ```suggestion
               List<StaticArgument> staticArgs =
                       systemTypeInference.getStaticArguments().orElseThrow(
                               () ->
                                       new IllegalStateException(
                                               "SystemTypeInference has no 
static arguments"));
   ```
   Can be folded?



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTimerManager.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Timer and watermark manager for {@link ProcessTableFunctionTestHarness}.
+ *
+ * <p>Handles timer registration, clearing, watermark tracking, and 
determining which timers are
+ * eligible to fire.
+ */
+@Internal
+class TestHarnessTimerManager {

Review Comment:
   It might make sense to add a separate test for this class.



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -1164,40 +1721,45 @@ private List<ArgumentInfo> 
extractAndValidateTypeInference(
         }
 
         /** Creates appropriate StateConverter for the given state data type. 
*/
-        private StateConverter createStateConverter(DataType stateDataType, 
ClassLoader classLoader)
-                throws Exception {
-            LogicalType logicalType = stateDataType.getLogicalType();
+        private StateConverter createStateConverter(
+                DataType stateDataType, ClassLoader classLoader) {
+            DataType resolvedType =
+                    
ListView.class.isAssignableFrom(stateDataType.getConversionClass())
+                                    || MapView.class.isAssignableFrom(
+                                            stateDataType.getConversionClass())
+                            ? stateDataType.getChildren().get(0)

Review Comment:
   Woundn't this just return the key type for `MapView`?



##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -303,38 +332,419 @@ public void clearStateForKey(String stateName, Row 
partitionKey) {
         stateManager.clearStateForKey(stateName, partitionKey);
     }
 
+    // 
-------------------------------------------------------------------------
+    // Watermark & Timer API
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * Sets the watermark for all tables to the given {@link LocalDateTime} 
and fires eligible
+     * timers.
+     */
+    public void setWatermark(LocalDateTime watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /** Sets the watermark for all tables to the given {@link Instant} and 
fires eligible timers. */
+    public void setWatermark(Instant watermark) throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkMillis(watermark.toEpochMilli());
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link 
LocalDateTime} and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, LocalDateTime 
watermark)
+            throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, 
DateTimeUtils.toTimestampMillis(watermark));
+    }
+
+    /**
+     * Sets the watermark for a specific table to the given {@link Instant} 
and fires eligible
+     * timers.
+     */
+    public void setWatermarkForTable(String tableArgument, Instant watermark) 
throws Exception {
+        checkNotNull(watermark, "watermark must not be null");
+        setWatermarkForTableMillis(tableArgument, watermark.toEpochMilli());
+    }
+
+    /** Returns all timers (both pending and fired), sorted by timestamp then 
name. */
+    public List<Timer> getTimers() {

Review Comment:
   Also have a method `getAllTimers(Row partitionKey)`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to