This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d948d82508 [FLINK-39377][table] Add initial implementation of 
ProcessTableFunctionTestHarness (#27928)
5d948d82508 is described below

commit 5d948d82508d3d33aad499352d809435488a0308
Author: Mika Naylor <[email protected]>
AuthorDate: Mon May 18 15:46:31 2026 +0200

    [FLINK-39377][table] Add initial implementation of 
ProcessTableFunctionTestHarness (#27928)
    
    This commit introduces an initial implementation of a test harness for 
PTFs, for use
    in unit tests that do not require a running Flink cluster.
    
    The implementation supports setting up the harness by configuration various 
test
    parameters, like fixtures for scalar arguments, datatypes for table 
arguments and
    partition settings for table arguments with set semantics.
    
    The harness on build does type and structure validation, as well as 
ensuring the
    test setup can handle the arguments defined on the PTF.
    
    It supports PTFs that use scalar, set semantic table and row semantic table 
arguments,
    as well as PTFs that have multiple of each. It supports PASS_COLUMN_THROUGH 
and
    OPTIONAL_PARTITION_BY traits.
    
    It currently does not support State, or Context (so no timers). It also 
does not enforce
    some static argument traits like SUPPORTS_UPDATES, REQUIRE_UPDATE_BEFORE.
---
 docs/content.zh/docs/dev/table/functions/ptfs.md   |  319 +++++
 docs/content/docs/dev/table/functions/ptfs.md      |  321 ++++-
 flink-table/flink-table-test-utils/pom.xml         |   11 +-
 .../functions/ProcessTableFunctionTestHarness.java | 1313 ++++++++++++++++++++
 .../runtime/functions/TestHarnessCallContext.java  |  102 ++
 .../functions/TestHarnessDataTypeFactory.java      |   99 ++
 .../functions/TestHarnessTableSemantics.java       |   68 +
 .../ProcessTableFunctionTestHarnessTest.java       | 1065 ++++++++++++++++
 flink-tests/pom.xml                                |    7 +
 9 files changed, 3303 insertions(+), 2 deletions(-)

diff --git a/docs/content.zh/docs/dev/table/functions/ptfs.md 
b/docs/content.zh/docs/dev/table/functions/ptfs.md
index 3c0eb9fb7c1..4190e9e3ba5 100644
--- a/docs/content.zh/docs/dev/table/functions/ptfs.md
+++ b/docs/content.zh/docs/dev/table/functions/ptfs.md
@@ -2034,3 +2034,322 @@ Limitations
 PTFs are in an early stage. The following limitations apply:
 - PTFs cannot run in batch mode.
 - Broadcast state
+
+Testing Process Table Functions
+-------------------------------
+
+The `ProcessTableFunctionTestHarness` provides a lightweight unit testing 
framework for Process Table
+Functions (PTFs). It is useful for unit testing and validating PTF business 
logic, multi-table PTF
+behaviour and validating errors.
+
+For end-to-end integration testing with the full Flink planner and runtime, 
use integration tests
+instead.
+
+{{< top >}}
+
+### Quick Start
+
+{{< tabs "quickstart" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.table.annotation.*;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import 
org.apache.flink.table.runtime.functions.ProcessTableFunctionTestHarness;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+// PTF under test
+@DataTypeHint("ROW<doubled INT>")
+public class DoublePTF extends ProcessTableFunction<Row> {
+  public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input) {
+    int value = input.getFieldAs("value");
+    collect(Row.of(value * 2));
+  }
+}
+
+// Test
+@Test
+void testDoublePTF() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+    ProcessTableFunctionTestHarness.ofClass(DoublePTF.class)
+    .withTableArgument("input", DataTypes.of("ROW<value INT>"))
+    .build()) {
+
+    harness.processElement(Row.of(5));
+    harness.processElement(Row.of(10));
+
+    List<Row> output = harness.getOutput();
+    assertThat(output).hasSize(2);
+    assertThat(output.get(0)).isEqualTo(Row.of(10));
+    assertThat(output.get(1)).isEqualTo(Row.of(20));
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}
+
+### Common Testing Scenarios
+
+#### Testing Row-Semantic Tables
+
+Use `.withTableArgument()` to configure the input table schema:
+
+{{< tabs "row-semantic" >}}
+{{< tab "Java" >}}
+```java
+public class PassthroughPTF extends ProcessTableFunction<Integer> {
+  public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input) {
+    collect(input.getFieldAs("value"));
+  }
+}
+
+@Test
+void testPassthrough() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+    ProcessTableFunctionTestHarness.ofClass(PassthroughPTF.class)
+    .withTableArgument("input", DataTypes.of("ROW<value INT>"))
+    .build()) {
+
+    harness.processElement(Row.of(42));
+    harness.processElement(Row.of(100));
+
+    List<Row> output = harness.getOutput();
+    assertThat(output).containsExactly(42, 100);
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### Testing Set-Semantic Tables with Partitioning
+
+For `SET_SEMANTIC_TABLE`, use `.withPartitionBy()` to configure partition 
columns:
+
+{{< tabs "set-semantic" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<doubled INT>")
+public class PartitionedPTF extends ProcessTableFunction<Row> {
+  public void eval(@ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input) {
+    int value = input.getFieldAs("value");
+    collect(Row.of(value * 2));
+  }
+}
+
+@Test
+void testPartitionedPTF() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+    ProcessTableFunctionTestHarness.ofClass(PartitionedPTF.class)
+    .withTableArgument("input", DataTypes.of("ROW<key STRING, value INT>"))
+    .withPartitionBy("input", "key")
+    .build()) {
+
+    harness.processElement(Row.of("A", 10));
+    harness.processElement(Row.of("B", 20));
+
+    List<Row> output = harness.getOutput();
+    assertThat(output.get(0)).isEqualTo(Row.of("A", 20));
+    assertThat(output.get(1)).isEqualTo(Row.of("B", 40));
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### Testing Multiple Table Arguments
+
+Use `processElementForTable()` to specify which table receives each row:
+
+{{< tabs "multi-table" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<output STRING>")
+public class JoinPTF extends ProcessTableFunction<Row> {
+  public void eval(
+    @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row left,
+    @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row right) {
+    if (left != null) {
+      collect(Row.of("LEFT: " + left));
+    }
+    if (right != null) {
+      collect(Row.of("RIGHT: " + right));
+    }
+  }
+}
+
+@Test
+void testMultiTable() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+    ProcessTableFunctionTestHarness.ofClass(JoinPTF.class)
+    .withTableArgument("left", DataTypes.of("ROW<id INT, name STRING>"))
+    .withPartitionBy("left", "id")
+    .withTableArgument("right", DataTypes.of("ROW<id INT, city STRING>"))
+    .withPartitionBy("right", "id")
+    .build()) {
+
+    // Use processElementForTable() to target specific tables
+    harness.processElementForTable("left", Row.of(1, "Alice"));
+    harness.processElementForTable("right", Row.of(1, "Berlin"));
+
+    List<Row> output = harness.getOutput();
+    assertThat(output.get(0)).isEqualTo(Row.of(1, null, "LEFT: +I[1, Alice]"));
+    assertThat(output.get(1)).isEqualTo(Row.of(null, 1, "RIGHT: +I[1, 
Berlin]"));
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### Testing with Scalar Arguments
+
+Use `.withScalarArgument()` to configure scalar parameter values:
+
+{{< tabs "scalar-args" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<value INT>")
+public class FilterPTF extends ProcessTableFunction<Row> {
+  public void eval(
+    @ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input,
+    int threshold) {
+    int value = input.getFieldAs("value");
+    if (value > threshold) {
+      collect(Row.of(value));
+    }
+  }
+}
+
+@Test
+void testFilter() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+    ProcessTableFunctionTestHarness.ofClass(FilterPTF.class)
+    .withTableArgument("input", DataTypes.of("ROW<value INT>"))
+    .withScalarArgument("threshold", 50) // Configure scalar value
+    .build()) {
+
+    harness.processElement(Row.of(30));
+    harness.processElement(Row.of(70));
+
+    List<Row> output = harness.getOutput();
+    assertThat(output).containsExactly(Row.of(70));
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+**Scalar-Only PTFs**: For PTFs with only scalar arguments, use `process()` to 
trigger evaluation:
+
+{{< tabs "scalar-only" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<sum INT>")
+public class AddPTF extends ProcessTableFunction<Row> {
+  public void eval(int a, int b) {
+    collect(Row.of(a + b));
+  }
+}
+
+@Test
+void testScalarOnly() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+    ProcessTableFunctionTestHarness.ofClass(AddPTF.class)
+    .withScalarArgument("a", 5)
+    .withScalarArgument("b", 7)
+    .build()) {
+
+    harness.process(); // Use process() instead of processElement()
+
+    List<Row> output = harness.getOutput();
+    assertThat(output).containsExactly(Row.of(12));
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### Configuring Table Argument Types
+
+In contexts where the harness can't infer the table argument types for table 
arguments (when using unannotated `Row` inputs,
+for example), it is possible to specify the input table type during the 
builder setup.
+
+{{< tabs "type-config" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<doubled INT, original INT>")
+// Note - An ArgumentHint without a DataTypeHint does not provide a type for 
the annotated input argument.
+// In cases like this, the harness cannot infer type, and so it needs to be 
declared
+// during harness setup.
+public static class DoublePTF extends ProcessTableFunction<Row> {
+  public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input) {
+    int value = (Integer) input.getField(0);
+    collect(Row.of(value * 2, value));
+  }
+}
+
+@Test
+void testBuilderType() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+    ProcessTableFunctionTestHarness.ofClass(DoublePTF.class)
+    .withTableArgument("input", DataTypes.of("ROW<value INT>"))
+    .build()) {
+
+    harness.processElement(Row.of(5));
+    assertThat(harness.getOutput()).containsExactly(Row.of(10));
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+**Structured Types**: The harness supports structured POJO types in addition 
to `Row`, both as PTF inputs
+and outputs:
+
+{{< tabs "pojo-types" >}}
+{{< tab "Java" >}}
+```java
+public static class Customer {
+  public String name;
+  public int age;
+}
+
+public class CustomerPTF extends ProcessTableFunction<Customer> {
+  public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Customer c) 
{
+    collect(c);
+  }
+}
+
+@Test
+void testPOJO() throws Exception {
+  try (ProcessTableFunctionTestHarness<Customer> harness =
+      ProcessTableFunctionTestHarness.ofClass(CustomerPTF.class)
+          .withTableArgument("input", DataTypes.of(Customer.class))
+          .build()) {
+
+    harness.processElement(Row.of("Alice", 30));
+
+    List<Customer> output = harness.getOutput();
+    assertThat(output.get(0).name).isEqualTo("Alice");
+    assertThat(output.get(0).age).isEqualTo(30);
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}
+
+### PTF Features Unsupported by the TestHarness
+
+- `Context` paramter
+- State (`@StateHint`)
+- Timers (`onTimer`)
+- `on_time` / `rowtime`
+- Update traits (`SUPPORTS_UPDATES`, `REQUIRE_UPDATE_BEFORE`)
diff --git a/docs/content/docs/dev/table/functions/ptfs.md 
b/docs/content/docs/dev/table/functions/ptfs.md
index 4bc78dc8a20..7181fc1a3d1 100644
--- a/docs/content/docs/dev/table/functions/ptfs.md
+++ b/docs/content/docs/dev/table/functions/ptfs.md
@@ -1025,7 +1025,7 @@ class TimerFunction extends ProcessTableFunction<String> {
 ### Handling of Late Records
 
 A late record is a record with a time attribute value that is less than or 
equal to the current
-watermark. PTFs handle late records just like non-late records by calling the 
`eval()` method. If 
+watermark. PTFs handle late records just like non-late records by calling the 
`eval()` method. If
 the `on_time` argument is specified, the late timestamp is preserved in the 
output. This behavior is
 the same for PTFs with row and set semantics.
 
@@ -2037,3 +2037,322 @@ Limitations
 PTFs are in an early stage. The following limitations apply:
 - PTFs cannot run in batch mode.
 - Broadcast state
+
+Testing Process Table Functions
+-------------------------------
+
+The `ProcessTableFunctionTestHarness` provides a lightweight unit testing 
framework for Process Table
+Functions (PTFs). It is useful for unit testing and validating PTF business 
logic, multi-table PTF
+behaviour and validating errors.
+
+For end-to-end integration testing with the full Flink planner and runtime, 
use integration tests
+instead.
+
+{{< top >}}
+
+### Quick Start
+
+{{< tabs "quickstart" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.table.annotation.*;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import 
org.apache.flink.table.runtime.functions.ProcessTableFunctionTestHarness;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+// PTF under test
+@DataTypeHint("ROW<doubled INT>")
+public class DoublePTF extends ProcessTableFunction<Row> {
+  public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input) {
+    int value = input.getFieldAs("value");
+    collect(Row.of(value * 2));
+  }
+}
+
+// Test
+@Test
+void testDoublePTF() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+    ProcessTableFunctionTestHarness.ofClass(DoublePTF.class)
+    .withTableArgument("input", DataTypes.of("ROW<value INT>"))
+    .build()) {
+
+    harness.processElement(Row.of(5));
+    harness.processElement(Row.of(10));
+
+    List<Row> output = harness.getOutput();
+    assertThat(output).hasSize(2);
+    assertThat(output.get(0)).isEqualTo(Row.of(10));
+    assertThat(output.get(1)).isEqualTo(Row.of(20));
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}
+
+### Common Testing Scenarios
+
+#### Testing Row-Semantic Tables
+
+Use `.withTableArgument()` to configure the input table schema:
+
+{{< tabs "row-semantic" >}}
+{{< tab "Java" >}}
+```java
+public class PassthroughPTF extends ProcessTableFunction<Integer> {
+  public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input) {
+    collect(input.getFieldAs("value"));
+  }
+}
+
+@Test
+void testPassthrough() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+    ProcessTableFunctionTestHarness.ofClass(PassthroughPTF.class)
+    .withTableArgument("input", DataTypes.of("ROW<value INT>"))
+    .build()) {
+
+    harness.processElement(Row.of(42));
+    harness.processElement(Row.of(100));
+
+    List<Row> output = harness.getOutput();
+    assertThat(output).containsExactly(42, 100);
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### Testing Set-Semantic Tables with Partitioning
+
+For `SET_SEMANTIC_TABLE`, use `.withPartitionBy()` to configure partition 
columns:
+
+{{< tabs "set-semantic" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<doubled INT>")
+public class PartitionedPTF extends ProcessTableFunction<Row> {
+  public void eval(@ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input) {
+    int value = input.getFieldAs("value");
+    collect(Row.of(value * 2));
+  }
+}
+
+@Test
+void testPartitionedPTF() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+    ProcessTableFunctionTestHarness.ofClass(PartitionedPTF.class)
+    .withTableArgument("input", DataTypes.of("ROW<key STRING, value INT>"))
+    .withPartitionBy("input", "key")
+    .build()) {
+
+    harness.processElement(Row.of("A", 10));
+    harness.processElement(Row.of("B", 20));
+
+    List<Row> output = harness.getOutput();
+    assertThat(output.get(0)).isEqualTo(Row.of("A", 20));
+    assertThat(output.get(1)).isEqualTo(Row.of("B", 40));
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### Testing Multiple Table Arguments
+
+Use `processElementForTable()` to specify which table receives each row:
+
+{{< tabs "multi-table" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<output STRING>")
+public class JoinPTF extends ProcessTableFunction<Row> {
+  public void eval(
+    @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row left,
+    @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row right) {
+    if (left != null) {
+      collect(Row.of("LEFT: " + left));
+    }
+    if (right != null) {
+      collect(Row.of("RIGHT: " + right));
+    }
+  }
+}
+
+@Test
+void testMultiTable() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+    ProcessTableFunctionTestHarness.ofClass(JoinPTF.class)
+    .withTableArgument("left", DataTypes.of("ROW<id INT, name STRING>"))
+    .withPartitionBy("left", "id")
+    .withTableArgument("right", DataTypes.of("ROW<id INT, city STRING>"))
+    .withPartitionBy("right", "id")
+    .build()) {
+
+    // Use processElementForTable() to target specific tables
+    harness.processElementForTable("left", Row.of(1, "Alice"));
+    harness.processElementForTable("right", Row.of(1, "Berlin"));
+
+    List<Row> output = harness.getOutput();
+    assertThat(output.get(0)).isEqualTo(Row.of(1, null, "LEFT: +I[1, Alice]"));
+    assertThat(output.get(1)).isEqualTo(Row.of(null, 1, "RIGHT: +I[1, 
Berlin]"));
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### Testing with Scalar Arguments
+
+Use `.withScalarArgument()` to configure scalar parameter values:
+
+{{< tabs "scalar-args" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<value INT>")
+public class FilterPTF extends ProcessTableFunction<Row> {
+  public void eval(
+    @ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input,
+    int threshold) {
+    int value = input.getFieldAs("value");
+    if (value > threshold) {
+      collect(Row.of(value));
+    }
+  }
+}
+
+@Test
+void testFilter() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+    ProcessTableFunctionTestHarness.ofClass(FilterPTF.class)
+    .withTableArgument("input", DataTypes.of("ROW<value INT>"))
+    .withScalarArgument("threshold", 50) // Configure scalar value
+    .build()) {
+
+    harness.processElement(Row.of(30));
+    harness.processElement(Row.of(70));
+
+    List<Row> output = harness.getOutput();
+    assertThat(output).containsExactly(Row.of(70));
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+**Scalar-Only PTFs**: For PTFs with only scalar arguments, use `process()` to 
trigger evaluation:
+
+{{< tabs "scalar-only" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<sum INT>")
+public class AddPTF extends ProcessTableFunction<Row> {
+  public void eval(int a, int b) {
+    collect(Row.of(a + b));
+  }
+}
+
+@Test
+void testScalarOnly() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+    ProcessTableFunctionTestHarness.ofClass(AddPTF.class)
+    .withScalarArgument("a", 5)
+    .withScalarArgument("b", 7)
+    .build()) {
+
+    harness.process(); // Use process() instead of processElement()
+
+    List<Row> output = harness.getOutput();
+    assertThat(output).containsExactly(Row.of(12));
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### Configuring Table Argument Types
+
+In contexts where the harness can't infer the table argument types for table 
arguments (when using unannotated `Row` inputs,
+for example), it is possible to specify the input table type during the 
builder setup.
+
+{{< tabs "type-config" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<doubled INT, original INT>")
+// Note - An ArgumentHint without a DataTypeHint does not provide a type for 
the annotated input argument.
+// In cases like this, the harness cannot infer type, and so it needs to be 
declared
+// during harness setup.
+public static class DoublePTF extends ProcessTableFunction<Row> {
+  public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input) {
+    int value = (Integer) input.getField(0);
+    collect(Row.of(value * 2, value));
+  }
+}
+
+@Test
+void testBuilderType() throws Exception {
+  try (ProcessTableFunctionTestHarness<Row> harness =
+    ProcessTableFunctionTestHarness.ofClass(DoublePTF.class)
+    .withTableArgument("input", DataTypes.of("ROW<value INT>"))
+    .build()) {
+
+    harness.processElement(Row.of(5));
+    assertThat(harness.getOutput()).containsExactly(Row.of(10));
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+**Structured Types**: The harness supports structured POJO types in addition 
to `Row`, both as PTF inputs
+and outputs:
+
+{{< tabs "pojo-types" >}}
+{{< tab "Java" >}}
+```java
+public static class Customer {
+  public String name;
+  public int age;
+}
+
+public class CustomerPTF extends ProcessTableFunction<Customer> {
+  public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Customer c) 
{
+    collect(c);
+  }
+}
+
+@Test
+void testPOJO() throws Exception {
+  try (ProcessTableFunctionTestHarness<Customer> harness =
+      ProcessTableFunctionTestHarness.ofClass(CustomerPTF.class)
+          .withTableArgument("input", DataTypes.of(Customer.class))
+          .build()) {
+
+    harness.processElement(Row.of("Alice", 30));
+
+    List<Customer> output = harness.getOutput();
+    assertThat(output.get(0).name).isEqualTo("Alice");
+    assertThat(output.get(0).age).isEqualTo(30);
+  }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}
+
+### PTF Features Unsupported by the TestHarness
+
+- `Context` paramter
+- State (`@StateHint`)
+- Timers (`onTimer`)
+- `on_time` / `rowtime`
+- Update traits (`SUPPORTS_UPDATES`, `REQUIRE_UPDATE_BEFORE`)
diff --git a/flink-table/flink-table-test-utils/pom.xml 
b/flink-table/flink-table-test-utils/pom.xml
index 66c5b7e9fb0..590f4d52359 100644
--- a/flink-table/flink-table-test-utils/pom.xml
+++ b/flink-table/flink-table-test-utils/pom.xml
@@ -56,7 +56,7 @@ under the License.
                </dependency>
                <dependency>
                        <groupId>org.apache.flink</groupId>
-                       <artifactId>flink-table-runtime</artifactId>
+                       <artifactId>flink-table-type-utils</artifactId>
                        <version>${project.version}</version>
                </dependency>
                <dependency>
@@ -65,6 +65,15 @@ under the License.
                        <version>${project.version}</version>
                </dependency>
 
+               <dependency>
+                       <groupId>org.codehaus.janino</groupId>
+                       <artifactId>janino</artifactId>
+               </dependency>
+               <dependency>
+                       <groupId>org.codehaus.janino</groupId>
+                       <artifactId>commons-compiler</artifactId>
+               </dependency>
+
                <!-- We bring the assertions from flink-table-common and 
repackage them -->
                <dependency>
                        <groupId>org.apache.flink</groupId>
diff --git 
a/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java
 
b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java
new file mode 100644
index 00000000000..425c88c3753
--- /dev/null
+++ 
b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java
@@ -0,0 +1,1313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.annotation.ArgumentTrait;
+import org.apache.flink.table.annotation.StateHint;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.FunctionKind;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.inference.StaticArgument;
+import org.apache.flink.table.types.inference.StaticArgumentTrait;
+import org.apache.flink.table.types.inference.SystemTypeInference;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Parameter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Test harness for {@link ProcessTableFunction}.
+ *
+ * <p>Provides a fluent builder API for configuring and testing 
ProcessTableFunctions (PTFs) with
+ * table and scalar arguments, lifecycle management, and output collection.
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * ProcessTableFunctionTestHarness<Row> harness =
+ *     ProcessTableFunctionTestHarness.ofClass(MyPTF.class)
+ *         .withTableArgument("input", DataTypes.of("ROW<id INT, name 
STRING>"))
+ *         .withScalarArgument("threshold", 100)
+ *         .build();
+ *
+ * harness.processElement(Row.of(1, "Alice"));
+ * harness.processElement(Row.of(2, "Bob"));
+ *
+ * List<Row> output = harness.getOutput();
+ * }</pre>
+ */
+@PublicEvolving
+public class ProcessTableFunctionTestHarness<OUT> implements AutoCloseable {
+
+    /** Holds input and output converters for a table argument. */
+    private static class ConverterPair {
+        final DataStructureConverter<Object, Object> input;
+        final DataStructureConverter<Object, Object> output;
+
+        ConverterPair(
+                DataStructureConverter<Object, Object> input,
+                DataStructureConverter<Object, Object> output) {
+            this.input = input;
+            this.output = output;
+        }
+    }
+
+    private final ProcessTableFunction<OUT> function;
+    private final FunctionContext functionContext;
+    private final List<OUT> output;
+    private boolean isOpen;
+    private final HarnessCollector collector;
+
+    private final String defaultTableArgument;
+    private final Method evalMethod;
+    private final List<ArgumentInfo> arguments;
+
+    private final Map<String, ArgumentInfo> argumentsByName;
+    private final boolean isSingleTableFunction;
+    private final Map<String, Object> scalarArgumentValues;
+
+    private boolean hasTableArguments = false;
+    private final Map<String, ConverterPair> argumentConverters;
+    private final DataStructureConverter<Object, Object> 
harnessOutputConverter;
+
+    private ProcessTableFunctionTestHarness(
+            ProcessTableFunction<OUT> function,
+            FunctionContext functionContext,
+            Method evalMethod,
+            List<ArgumentInfo> arguments,
+            Map<String, Object> scalarArgumentValues,
+            Map<String, ConverterPair> argumentConverters,
+            DataStructureConverter<Object, Object> harnessOutputConverter)
+            throws Exception {
+        this.function = function;
+        this.functionContext = functionContext;
+        this.evalMethod = evalMethod;
+        this.arguments = arguments;
+        this.scalarArgumentValues = scalarArgumentValues;
+        this.argumentConverters = argumentConverters;
+        this.harnessOutputConverter = harnessOutputConverter;
+        this.output = new ArrayList<>();
+        this.collector = new HarnessCollector();
+        this.isOpen = false;
+
+        this.argumentsByName = new HashMap<>();
+        for (ArgumentInfo arg : arguments) {
+            if (arg.name != null) {
+                argumentsByName.put(arg.name, arg);
+            }
+        }
+
+        final List<ArgumentInfo> tableArguments = new ArrayList<>();
+        for (ArgumentInfo arg : arguments) {
+            if (arg.isTableArgument) {
+                tableArguments.add(arg);
+                this.hasTableArguments = true;
+            }
+        }
+
+        if (tableArguments.size() == 1) {
+            this.defaultTableArgument = tableArguments.get(0).name;
+            this.isSingleTableFunction = true;
+        } else {
+            this.defaultTableArgument = null;
+            this.isSingleTableFunction = false;
+        }
+
+        openFunction();
+    }
+
+    /** Creates a new harness builder for the given ProcessTableFunction 
class. */
+    public static <OUT> Builder<OUT> ofClass(
+            Class<? extends ProcessTableFunction<OUT>> functionClass) {
+        return new Builder<>(functionClass);
+    }
+
+    private void openFunction() throws Exception {
+        function.open(functionContext);
+        function.setCollector(collector);
+        isOpen = true;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (isOpen) {
+            function.close();
+            isOpen = false;
+        }
+    }
+
+    /**
+     * Process a single element for the default table argument.
+     *
+     * <p>For PTFs with a single table argument, this processes one row. For 
multiple table
+     * arguments, use {@link #processElementForTable(String, Row)}.
+     */
+    public void processElement(Row row) throws Exception {
+        if (!isSingleTableFunction) {
+            throw new IllegalStateException(
+                    "PTF has multiple table arguments. Use 
processElementForTable(argumentName, row) "
+                            + "to specify which table argument should receive 
the row.");
+        }
+
+        processElementForTable(defaultTableArgument, row);
+    }
+
+    /** Process a single element constructed from values. */
+    public void processElement(Object... values) throws Exception {
+        processElement(Row.of(values));
+    }
+
+    /** Process a single element with a specific RowKind. */
+    public void processElement(RowKind rowKind, Object... values) throws 
Exception {
+        processElement(Row.ofKind(rowKind, values));
+    }
+
+    /** Process a single element for a specific table argument. */
+    public void processElementForTable(String tableArgument, Row row) throws 
Exception {
+        checkState(isOpen, "Harness not open");
+        checkNotNull(tableArgument, "tableArgument must not be null");
+
+        ArgumentInfo tableArg = argumentsByName.get(tableArgument);
+        if (tableArg == null) {
+            throw new IllegalArgumentException("Unknown table argument: " + 
tableArgument);
+        }
+        invokeEval(tableArg, row);
+    }
+
+    /** Process a single element for a specific table argument. */
+    public void processElementForTable(String tableArgument, Object... values) 
throws Exception {
+        processElementForTable(tableArgument, Row.of(values));
+    }
+
+    /** Process a single element for a specific table argument with RowKind. */
+    public void processElementForTable(String tableArgument, RowKind rowKind, 
Object... values)
+            throws Exception {
+        processElementForTable(tableArgument, Row.ofKind(rowKind, values));
+    }
+
+    /**
+     * Processes the PTF's eval() method with scalar arguments only.
+     *
+     * <p>This method is specifically for scalar-only PTFs (PTFs with only 
scalar arguments and no
+     * table arguments). For PTFs that accept table arguments, use {@link 
#processElement(Row)} or
+     * {@link #processElementForTable(String, Row)} instead.
+     *
+     * @throws IllegalStateException if the PTF has any table arguments
+     * @throws Exception if the eval() invocation fails
+     */
+    public void process() throws Exception {
+        checkState(isOpen, "Harness not open");
+
+        if (hasTableArguments) {
+            throw new IllegalStateException(
+                    "process() is only for scalar-only PTFs. This PTF has 
table arguments. "
+                            + "Use processElement() or 
processElementForTable() instead.");
+        }
+
+        Object[] args = arguments.stream().map(arg -> 
scalarArgumentValues.get(arg.name)).toArray();
+
+        try {
+            evalMethod.invoke(function, args);
+        } catch (InvocationTargetException e) {
+            handleEvalInvocationException(
+                    "Exception occurred during scalar-only PTF eval() 
invocation.\n", args, e);
+        }
+    }
+
+    /** Returns all collected output rows. */
+    public List<OUT> getOutput() {
+        return List.copyOf(output);
+    }
+
+    /** Clears all collected output. */
+    public void clearOutput() {
+        output.clear();
+    }
+
+    /**
+     * Given a target table argument and a row to process, construct the right 
set of arguments for
+     * the PTF's eval function and attempt to invoke it.
+     */
+    private void invokeEval(ArgumentInfo activeTableArg, Row activeRow) throws 
Exception {
+        // Set collector context so it can prepend columns if needed
+        collector.setContext(activeTableArg, activeRow);
+
+        Object[] args = new Object[arguments.size()];
+
+        for (int i = 0; i < arguments.size(); i++) {
+            ArgumentInfo arg = arguments.get(i);
+
+            if (arg.isTableArgument && arg.name.equals(activeTableArg.name)) {
+                // If the argument is the active table argument, first convert 
the input row
+                // to an internal RowData type, and then convert the RowData 
to type that the
+                // argument expects. For Rows, this will structure the Row 
based on the table
+                // argument structure. Otherwise, for POJOs, it will pass the 
expected POJO to eval.
+
+                ConverterPair pair = argumentConverters.get(arg.name);
+
+                args[i] = 
pair.output.toExternalOrNull(pair.input.toInternalOrNull(activeRow));
+
+            } else if (arg.isScalar) {
+                args[i] = scalarArgumentValues.get(arg.name);
+
+            } else if (arg.isTableArgument) {
+                // Inactive table arguments receive null
+                args[i] = null;
+            } else {
+                throw new IllegalStateException(
+                        "Unexpected argument type at position " + i + ": " + 
arg.name);
+            }
+        }
+
+        try {
+            evalMethod.invoke(function, args);
+        } catch (InvocationTargetException e) {
+            String partitionInfo =
+                    activeTableArg.partitionColumnNames != null
+                                    && 
activeTableArg.partitionColumnNames.length > 0
+                            ? String.format(
+                                    " (partition columns: %s)",
+                                    
Arrays.toString(activeTableArg.partitionColumnNames))
+                            : "";
+            String contextMessage =
+                    String.format(
+                            "Exception occurred during PTF eval() while 
processing table argument '%s'%s.\n",
+                            activeTableArg.name, partitionInfo);
+            handleEvalInvocationException(contextMessage, args, e);
+        }
+    }
+
+    /** Collector implementation that stores output in the harness. */
+    private class HarnessCollector implements Collector<OUT> {
+        private ArgumentInfo activeTableArg;
+        private Row activeRow;
+
+        void setContext(ArgumentInfo tableArg, Row row) {
+            this.activeTableArg = tableArg;
+            this.activeRow = row;
+        }
+
+        @Override
+        public void collect(OUT record) {
+            OUT finalRecord;
+
+            if (activeTableArg == null || !activeTableArg.isTableArgument) {
+                finalRecord = record;
+            } else {
+                switch (activeTableArg.prependStrategy) {
+                    case ALL_COLUMNS:
+                        finalRecord = prependAllColumns(record);
+                        break;
+                    case PARTITION_KEYS:
+                        finalRecord = prependPartitionKeys(record);
+                        break;
+                    case NONE:
+                        finalRecord = record;
+                        break;
+                    default:
+                        throw new IllegalStateException(
+                                "Unknown prepend strategy: " + 
activeTableArg.prependStrategy);
+                }
+            }
+
+            // After prepending, round-trip through converter to ensure output 
has proper
+            // field structure from derived output schema
+            OUT structuredRecord = applyOutputConverter(finalRecord);
+            output.add(structuredRecord);
+        }
+
+        @SuppressWarnings("unchecked")
+        private OUT applyOutputConverter(OUT record) {
+            if (record instanceof Row) {
+                Object internal = 
harnessOutputConverter.toInternalOrNull(record);
+                Object external = 
harnessOutputConverter.toExternalOrNull(internal);
+                return (OUT) external;
+            }
+            return record;
+        }
+
+        @SuppressWarnings("unchecked")
+        private OUT prependPartitionKeys(OUT ptfOutput) {
+            if (!(ptfOutput instanceof Row)) {
+                throw new IllegalStateException(
+                        "Cannot prepend partition keys to non-Row output type: 
"
+                                + ptfOutput.getClass());
+            }
+
+            Row ptfRow = (Row) ptfOutput;
+
+            int totalPartitionKeyCount = 0;
+            for (ArgumentInfo arg : arguments) {
+                if (arg.isSetSemantic && arg.partitionColumnNames != null) {
+                    totalPartitionKeyCount += arg.partitionColumnNames.length;
+                }
+            }
+
+            int ptfOutputArity = ptfRow.getArity();
+            int totalArity = totalPartitionKeyCount + ptfOutputArity;
+
+            Row result = new Row(ptfRow.getKind(), totalArity);
+
+            // Extract partition key values from active row
+            Object[] partitionKeyValues = new 
Object[activeTableArg.partitionColumnNames.length];
+            for (int i = 0; i < activeTableArg.partitionColumnNames.length; 
i++) {
+                String columnName = activeTableArg.partitionColumnNames[i];
+                int columnIndex = getFieldIndex(activeTableArg.dataType, 
columnName);
+                partitionKeyValues[i] = activeRow.getField(columnIndex);
+            }
+
+            int resultIndex = 0;
+            for (ArgumentInfo arg : arguments) {
+                if (arg.isSetSemantic && arg.partitionColumnNames != null) {
+                    for (int i = 0; i < arg.partitionColumnNames.length; i++) {
+                        result.setField(resultIndex++, partitionKeyValues[i]);
+                    }
+                }
+            }
+
+            for (int i = 0; i < ptfOutputArity; i++) {
+                result.setField(resultIndex++, ptfRow.getField(i));
+            }
+
+            return (OUT) result;
+        }
+
+        /** Helper to get field index by name from a DataType. */
+        private int getFieldIndex(DataType dataType, String fieldName) {
+            List<String> fieldNames = getFieldNames(dataType);
+            int index = fieldNames.indexOf(fieldName);
+            if (index < 0) {
+                throw new IllegalStateException(
+                        String.format("Field '%s' not found in type %s", 
fieldName, dataType));
+            }
+            return index;
+        }
+
+        @SuppressWarnings("unchecked")
+        private OUT prependAllColumns(OUT ptfOutput) {
+            if (!(ptfOutput instanceof Row)) {
+                throw new IllegalStateException(
+                        "Cannot prepend columns to non-Row output type: " + 
ptfOutput.getClass());
+            }
+
+            Row ptfRow = (Row) ptfOutput;
+            int inputArity = activeRow.getArity();
+            int ptfOutputArity = ptfRow.getArity();
+            int totalArity = inputArity + ptfOutputArity;
+
+            Row result = new Row(ptfRow.getKind(), totalArity);
+
+            for (int i = 0; i < inputArity; i++) {
+                result.setField(i, activeRow.getField(i));
+            }
+
+            for (int i = 0; i < ptfOutputArity; i++) {
+                result.setField(inputArity + i, ptfRow.getField(i));
+            }
+
+            return (OUT) result;
+        }
+
+        @Override
+        public void close() {}
+    }
+
+    /** Extracts field names from RowType or StructuredType. */
+    private static List<String> getFieldNames(DataType dataType) {
+        LogicalType logicalType = dataType.getLogicalType();
+        List<String> fieldNames = new ArrayList<>();
+
+        if (logicalType instanceof RowType) {
+            RowType rowType = (RowType) logicalType;
+            for (RowType.RowField field : rowType.getFields()) {
+                fieldNames.add(field.getName());
+            }
+        } else if (logicalType instanceof StructuredType) {
+            StructuredType structuredType = (StructuredType) logicalType;
+            for (StructuredType.StructuredAttribute attr : 
structuredType.getAttributes()) {
+                fieldNames.add(attr.getName());
+            }
+        } else {
+            throw new IllegalStateException(
+                    String.format(
+                            "Unsupported data type: %s. "
+                                    + "Only Row and structured types are 
supported.",
+                            dataType));
+        }
+
+        return fieldNames;
+    }
+
+    /**
+     * Builder for {@link ProcessTableFunctionTestHarness}.
+     *
+     * @param <OUT> The output type of the ProcessTableFunction
+     */
+    @PublicEvolving
+    public static class Builder<OUT> {
+        private final Class<? extends ProcessTableFunction<OUT>> functionClass;
+
+        private final Map<String, ScalarArgumentConfiguration> scalarArgs = 
new HashMap<>();
+        private final Map<String, TableArgumentConfiguration> tableArgs = new 
HashMap<>();
+        private final Map<String, PartitionConfiguration> partitionConfigs = 
new HashMap<>();
+
+        private Builder(Class<? extends ProcessTableFunction<OUT>> 
functionClass) {
+            this.functionClass = checkNotNull(functionClass, "functionClass 
must not be null");
+        }
+
+        private void validateArgumentNotYetConfigured(String argumentName) {
+            if (scalarArgs.containsKey(argumentName) || 
tableArgs.containsKey(argumentName)) {
+                throw new IllegalArgumentException("Argument already 
configured: " + argumentName);
+            }
+        }
+
+        // 
---------------------------------------------------------------------
+        // Table & Scalar Arguments
+        // 
---------------------------------------------------------------------
+
+        /**
+         * Configures a table argument with its schema (named argument).
+         *
+         * <p>Use this for dynamic tables that receive elements during the 
test. Elements are
+         * provided via {@link #processElement(Row)} or {@link 
#processElementForTable(String,
+         * Row)}.
+         *
+         * @param argumentName The table argument name
+         * @param dataType The schema/structure of the table
+         */
+        public Builder<OUT> withTableArgument(String argumentName, 
AbstractDataType<?> dataType) {
+            checkNotNull(argumentName, "argumentName must not be null");
+            checkNotNull(dataType, "dataType must not be null");
+
+            validateArgumentNotYetConfigured(argumentName);
+
+            tableArgs.put(argumentName, new 
TableArgumentConfiguration(dataType));
+            return this;
+        }
+
+        /**
+         * Configures a table argument without an explicit schema.
+         *
+         * <p>Use this for structured type arguments where the type can be 
inferred from the PTF's
+         * eval() signature. For Row arguments, use {@link 
#withTableArgument(String,
+         * AbstractDataType)} with an explicit schema.
+         *
+         * @param argumentName The table argument name
+         */
+        public Builder<OUT> withTableArgument(String argumentName) {
+            checkNotNull(argumentName, "argumentName must not be null");
+
+            validateArgumentNotYetConfigured(argumentName);
+
+            tableArgs.put(argumentName, new TableArgumentConfiguration(null));
+            return this;
+        }
+
+        /**
+         * Configures a scalar (non-table) argument for the PTF's eval() 
method.
+         *
+         * <p>Scalar arguments are constant values passed to every eval() 
invocation, such as
+         * thresholds, multipliers, or configuration parameters.
+         *
+         * @param argumentName Must match the parameter name in eval() or the 
@ArgumentHint name
+         * @param value The value to pass for this argument in all eval() calls
+         */
+        public Builder<OUT> withScalarArgument(String argumentName, Object 
value) {
+            checkNotNull(argumentName, "argumentName must not be null");
+
+            validateArgumentNotYetConfigured(argumentName);
+
+            ScalarArgumentConfiguration config = new 
ScalarArgumentConfiguration(value);
+            scalarArgs.put(argumentName, config);
+            return this;
+        }
+
+        // 
---------------------------------------------------------------------
+        // Partitioning
+        // 
---------------------------------------------------------------------
+
+        /**
+         * Specifies partition columns for a set semantic table.
+         *
+         * @param argumentName The table argument name
+         * @param columnNames The partition column names
+         * @return This builder
+         */
+        public Builder<OUT> withPartitionBy(String argumentName, String... 
columnNames) {
+            checkNotNull(argumentName, "argumentName must not be null");
+            checkNotNull(columnNames, "columnNames must not be null");
+            checkArgument(columnNames.length > 0, "Must specify at least one 
column");
+
+            if (partitionConfigs.containsKey(argumentName)) {
+                throw new IllegalArgumentException(
+                        "Partition config already exists for: " + 
argumentName);
+            }
+
+            PartitionConfiguration config = new 
PartitionConfiguration(columnNames);
+            partitionConfigs.put(argumentName, config);
+            return this;
+        }
+
+        // 
---------------------------------------------------------------------
+        // Build
+        // 
---------------------------------------------------------------------
+
+        /**
+         * Builds the test harness.
+         *
+         * <p>This instantiates the PTF, validates configuration via type 
inference, creates the
+         * FunctionContext, and opens the function.
+         *
+         * @return The configured test harness
+         * @throws Exception If instantiation or opening fails
+         */
+        public ProcessTableFunctionTestHarness<OUT> build() throws Exception {
+            ProcessTableFunction<OUT> function = instantiateFunction();
+
+            DataTypeFactory dataTypeFactory = createDataTypeFactory();
+            TypeInference baseTypeInference = 
function.getTypeInference(dataTypeFactory);
+            TypeInference systemTypeInference =
+                    SystemTypeInference.of(FunctionKind.PROCESS_TABLE, 
baseTypeInference);
+
+            List<ArgumentInfo> arguments =
+                    extractAndValidateTypeInference(function, 
systemTypeInference);
+
+            FunctionContext functionContext =
+                    new FunctionContext(null, 
Thread.currentThread().getContextClassLoader(), null);
+
+            Method evalMethod = findEvalMethod();
+
+            validateEvalMethodSupported(evalMethod, arguments);
+            validatePartitionConsistency(arguments);
+
+            Map<String, ConverterPair> argumentConverters = new HashMap<>();
+            createConverters(arguments, argumentConverters);
+
+            // Derive output schema using SystemTypeInference (includes 
deduplication)
+            DataType derivedOutputType =
+                    deriveOutputTypeFromSystemInference(
+                            function, dataTypeFactory, systemTypeInference, 
arguments);
+
+            // Create output converter for PTF emissions
+            DataStructureConverter<Object, Object> harnessOutputConverter =
+                    createPTFOutputConverter(derivedOutputType);
+
+            return new ProcessTableFunctionTestHarness<>(
+                    function,
+                    functionContext,
+                    evalMethod,
+                    arguments,
+                    extractScalarValues(arguments),
+                    argumentConverters,
+                    harnessOutputConverter);
+        }
+
+        /** Extracts scalar values from configs, creating a map keyed by 
argument name. */
+        private Map<String, Object> extractScalarValues(List<ArgumentInfo> 
arguments) {
+            Map<String, Object> values = new HashMap<>();
+            for (ArgumentInfo arg : arguments) {
+                if (arg.isScalar) {
+                    ScalarArgumentConfiguration config = 
scalarArgs.get(arg.name);
+                    if (config != null) {
+                        values.put(arg.name, config.value);
+                    }
+                }
+            }
+            return values;
+        }
+
+        /**
+         * Creates converter that enriches PTF output Rows with field names 
from the derived schema.
+         */
+        private DataStructureConverter<Object, Object> 
createPTFOutputConverter(
+                DataType derivedOutputType) {
+            ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+            DataStructureConverter<Object, Object> converter =
+                    DataStructureConverters.getConverter(derivedOutputType);
+            converter.open(classLoader);
+            return converter;
+        }
+
+        /**
+         * Creates and initializes data structure converters for all table 
arguments.
+         *
+         * <p>For Row types, both input and output converters are the same 
(between Row and
+         * RowData).
+         *
+         * <p>For structured types, input converter uses Row types (Row to 
RowData), and the output
+         * converter uses the structured type.
+         */
+        private void createConverters(
+                List<ArgumentInfo> arguments, Map<String, ConverterPair> 
argumentConverters) {
+            ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+
+            for (ArgumentInfo arg : arguments) {
+                if (arg.isTableArgument) {
+                    String converterKey = arg.name;
+
+                    LogicalType logicalType = arg.dataType.getLogicalType();
+                    boolean isStructuredType =
+                            logicalType instanceof StructuredType
+                                    && ((StructuredType) logicalType)
+                                            .getImplementationClass()
+                                            .isPresent();
+
+                    if (isStructuredType) {
+                        StructuredType structuredType = (StructuredType) 
logicalType;
+                        List<RowType.RowField> rowFields = new ArrayList<>();
+                        for (StructuredType.StructuredAttribute attr :
+                                structuredType.getAttributes()) {
+                            rowFields.add(new RowType.RowField(attr.getName(), 
attr.getType()));
+                        }
+                        RowType rowType = new 
RowType(logicalType.isNullable(), rowFields);
+                        DataType rowDataType = 
TypeConversions.fromLogicalToDataType(rowType);
+
+                        DataStructureConverter<Object, Object> inputConverter =
+                                
DataStructureConverters.getConverter(rowDataType);
+                        inputConverter.open(classLoader);
+
+                        DataStructureConverter<Object, Object> outputConverter 
=
+                                
DataStructureConverters.getConverter(arg.dataType);
+                        outputConverter.open(classLoader);
+
+                        argumentConverters.put(
+                                converterKey, new 
ConverterPair(inputConverter, outputConverter));
+                    } else {
+                        // For Row types, input and output converters are the 
same
+                        DataStructureConverter<Object, Object> converter =
+                                
DataStructureConverters.getConverter(arg.dataType);
+                        converter.open(classLoader);
+
+                        argumentConverters.put(
+                                converterKey, new ConverterPair(converter, 
converter));
+                    }
+                }
+            }
+        }
+
+        private Method findEvalMethod() throws NoSuchMethodException {
+            Method[] methods = functionClass.getMethods();
+            Method evalMethod = null;
+            int evalMethodCount = 0;
+
+            for (Method method : methods) {
+                if (method.getName().equals("eval")) {
+                    evalMethod = method;
+                    evalMethodCount++;
+                }
+            }
+
+            if (evalMethodCount == 0) {
+                throw new NoSuchMethodException(
+                        "No eval() method found in " + 
functionClass.getSimpleName());
+            } else if (evalMethodCount > 1) {
+                throw new IllegalStateException(
+                        "Multiple eval() methods found in "
+                                + functionClass.getSimpleName()
+                                + ". ProcessTableFunction must have exactly 
one eval() method.");
+            } else {
+                return evalMethod;
+            }
+        }
+
+        /**
+         * Validates that the eval() method doesn't use unsupported features. 
Temporary, until state
+         * and context is supported.
+         */
+        private void validateEvalMethodSupported(Method evalMethod, 
List<ArgumentInfo> arguments) {
+            Parameter[] parameters = evalMethod.getParameters();
+
+            for (int i = 0; i < parameters.length; i++) {
+                Parameter param = parameters[i];
+                Class<?> paramType = param.getType();
+
+                if 
(ProcessTableFunction.Context.class.isAssignableFrom(paramType)) {
+                    throw new IllegalStateException(
+                            String.format(
+                                    "ProcessTableFunctionTestHarness does not 
yet support Context parameters. "
+                                            + "Found Context parameter at 
position %d in eval() method. ",
+                                    i));
+                }
+
+                if (param.isAnnotationPresent(StateHint.class)) {
+                    throw new IllegalStateException(
+                            String.format(
+                                    "ProcessTableFunctionTestHarness does not 
yet support state parameters. "
+                                            + "Found @StateHint parameter at 
position %d in eval() method. ",
+                                    i));
+                }
+            }
+
+            if (parameters.length != arguments.size()) {
+                throw new IllegalStateException(
+                        String.format(
+                                "Parameter count mismatch: eval() has %d 
parameters but only %d arguments were extracted. "
+                                        + "This may indicate missing 
@ArgumentHint annotations.",
+                                parameters.length, arguments.size()));
+            }
+
+            for (int i = 0; i < parameters.length; i++) {
+                Parameter param = parameters[i];
+                Class<?> paramType = param.getType();
+                ArgumentInfo arg = arguments.get(i);
+
+                if (arg.isScalar) {
+                    ScalarArgumentConfiguration config = 
scalarArgs.get(arg.name);
+                    if (config != null
+                            && config.value != null
+                            && 
!paramType.isAssignableFrom(config.value.getClass())) {
+                        throw new IllegalStateException(
+                                String.format(
+                                        "Type mismatch for scalar argument 
'%s' at position %d: "
+                                                + "eval() parameter expects %s 
but provided value is %s",
+                                        arg.name,
+                                        i,
+                                        paramType.getName(),
+                                        config.value.getClass().getName()));
+                    }
+                }
+            }
+        }
+
+        /**
+         * Validates that all SET_SEMANTIC_TABLE arguments with partitioning 
use consistent
+         * partitioning. All such arguments must have the same number of 
partition columns with
+         * matching data types.
+         */
+        private void validatePartitionConsistency(List<ArgumentInfo> 
arguments) {
+            final List<ArgumentInfo> partitionedTables = new ArrayList<>();
+            for (ArgumentInfo arg : arguments) {
+                if (arg.isSetSemantic && arg.partitionColumnNames != null) {
+                    partitionedTables.add(arg);
+                }
+            }
+
+            if (partitionedTables.size() <= 1) {
+                return;
+            }
+
+            final ArgumentInfo first = partitionedTables.get(0);
+            final int expectedPartitionColumnCount = 
first.partitionColumnNames.length;
+
+            for (int i = 1; i < partitionedTables.size(); i++) {
+                ArgumentInfo current = partitionedTables.get(i);
+
+                if (current.partitionColumnNames.length != 
expectedPartitionColumnCount) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Inconsistent partitioning: Table argument 
'%s' has %d partition column(s), "
+                                            + "but table argument '%s' has %d 
partition column(s). "
+                                            + "All SET_SEMANTIC_TABLE 
arguments must use consistent partitioning "
+                                            + "(same number of columns and 
matching data types).",
+                                    first.name,
+                                    expectedPartitionColumnCount,
+                                    current.name,
+                                    current.partitionColumnNames.length));
+                }
+
+                // Check that partition column types match
+                for (int colIdx = 0; colIdx < expectedPartitionColumnCount; 
colIdx++) {
+                    String firstColName = first.partitionColumnNames[colIdx];
+                    String currentColName = 
current.partitionColumnNames[colIdx];
+                    DataType firstColType = extractPartitionColumnType(first, 
firstColName);
+                    DataType currentColType = 
extractPartitionColumnType(current, currentColName);
+
+                    if 
(!firstColType.getLogicalType().equals(currentColType.getLogicalType())) {
+                        throw new IllegalArgumentException(
+                                String.format(
+                                        "Inconsistent partitioning: Partition 
column '%s' of table argument '%s' "
+                                                + "has type %s, but partition 
column '%s' of table argument '%s' "
+                                                + "has type %s. All 
SET_SEMANTIC_TABLE arguments must use "
+                                                + "consistent partitioning 
(same number of columns and matching data types).",
+                                        firstColName,
+                                        first.name,
+                                        firstColType,
+                                        currentColName,
+                                        current.name,
+                                        currentColType));
+                    }
+                }
+            }
+        }
+
+        private DataType extractPartitionColumnType(ArgumentInfo arg, String 
columnName) {
+            if (!(arg.dataType instanceof FieldsDataType)) {
+                throw new IllegalStateException(
+                        String.format(
+                                "Cannot extract data type for partition column 
'%s' of argument '%s': "
+                                        + "argument data type is not a 
FieldsDataType (actual: %s)",
+                                columnName, arg.name, 
arg.dataType.getClass().getSimpleName()));
+            }
+
+            FieldsDataType fieldsDataType = (FieldsDataType) arg.dataType;
+            List<String> fieldNames = getFieldNames(arg.dataType);
+            List<DataType> fieldDataTypes = fieldsDataType.getChildren();
+
+            int fieldIndex = fieldNames.indexOf(columnName);
+            if (fieldIndex >= 0) {
+                return fieldDataTypes.get(fieldIndex);
+            }
+
+            throw new IllegalStateException(
+                    String.format(
+                            "Partition column '%s' not found in argument '%s'",
+                            columnName, arg.name));
+        }
+
+        // 
---------------------------------------------------------------------
+        // Type Inference
+        // 
---------------------------------------------------------------------
+
+        /**
+         * Extracts type inference from the PTF and validates builder 
configuration.
+         *
+         * <p>Uses SystemTypeInference to validate things like reserved 
argument names, multiple
+         * table argument rules, static argument trait validation, etc.
+         */
+        private List<ArgumentInfo> extractAndValidateTypeInference(
+                ProcessTableFunction<OUT> function, TypeInference 
systemTypeInference) {
+
+            Optional<List<StaticArgument>> staticArgsOpt = 
systemTypeInference.getStaticArguments();
+            if (staticArgsOpt.isEmpty()) {
+                throw new IllegalStateException(
+                        "PTF does not provide static argument information. "
+                                + "Ensure @ArgumentHint annotations are 
present on all eval() parameters.");
+            }
+
+            List<StaticArgument> allArgs = staticArgsOpt.get();
+            List<StaticArgument> userArgs = new ArrayList<>();
+            for (StaticArgument arg : allArgs) {
+                if (!isSystemArgument(arg.getName())) {
+                    userArgs.add(arg);
+                }
+            }
+
+            List<ArgumentInfo> arguments = new ArrayList<>();
+
+            for (StaticArgument staticArg : userArgs) {
+                boolean isScalar = 
staticArg.getTraits().contains(StaticArgumentTrait.SCALAR);
+                boolean isTableArg =
+                        
staticArg.getTraits().contains(StaticArgumentTrait.ROW_SEMANTIC_TABLE)
+                                || staticArg
+                                        .getTraits()
+                                        
.contains(StaticArgumentTrait.SET_SEMANTIC_TABLE);
+
+                if (isScalar || isTableArg) {
+                    ArgumentInfo argInfo = buildArgumentInfo(staticArg);
+                    arguments.add(argInfo);
+                } else {
+                    throw new IllegalStateException(
+                            "Unknown argument type for StaticArgument. "
+                                    + "Expected SCALAR, ROW_SEMANTIC_TABLE, or 
SET_SEMANTIC_TABLE trait.");
+                }
+            }
+
+            validateArgumentConfiguration(arguments);
+
+            return arguments;
+        }
+
+        /** Checks if an argument name is a system-reserved argument. */
+        private boolean isSystemArgument(String argName) {
+            return 
SystemTypeInference.PROCESS_TABLE_FUNCTION_ARG_ON_TIME.equals(argName)
+                    || 
SystemTypeInference.PROCESS_TABLE_FUNCTION_ARG_UID.equals(argName);
+        }
+
+        private DataTypeFactory createDataTypeFactory() {
+            return new TestHarnessDataTypeFactory();
+        }
+
+        private ArgumentInfo buildArgumentInfo(StaticArgument staticArg) {
+
+            String name = staticArg.getName();
+            ArgumentTrait primaryTrait = 
extractPrimaryTrait(staticArg.getTraits());
+
+            DataType dataType;
+            if (primaryTrait == ArgumentTrait.SCALAR) {
+                Optional<DataType> dataTypeOpt = staticArg.getDataType();
+                if (dataTypeOpt.isPresent()) {
+                    dataType = dataTypeOpt.get();
+                } else {
+                    throw new IllegalStateException(
+                            String.format(
+                                    "Cannot determine data type for scalar 
argument '%s'", name));
+                }
+            } else {
+                // For table arguments, check both annotation and builder 
config
+                Optional<DataType> annotationTypeOpt = staticArg.getDataType();
+                TableArgumentConfiguration config = tableArgs.get(name);
+
+                if (annotationTypeOpt.isPresent()
+                        && config != null
+                        && config.explicitType != null) {
+                    // Both specified - validate they match
+                    DataTypeFactory factory = createDataTypeFactory();
+                    DataType builderType = 
factory.createDataType(config.explicitType);
+                    DataType annotationType = annotationTypeOpt.get();
+
+                    if (!annotationType.equals(builderType)) {
+                        throw new IllegalStateException(
+                                String.format(
+                                        "Type mismatch for table argument 
'%s': "
+                                                + "annotation declares type %s 
but builder declares type %s. "
+                                                + "If the PTF already 
explicitly declares type information, "
+                                                + "there is no need to specify 
it in the builder.",
+                                        name, annotationType, builderType));
+                    }
+                    dataType = annotationType;
+                } else if (annotationTypeOpt.isPresent()) {
+                    dataType = annotationTypeOpt.get();
+                } else if (config != null && config.explicitType != null) {
+                    DataTypeFactory factory = createDataTypeFactory();
+                    dataType = factory.createDataType(config.explicitType);
+                } else {
+                    // Try to infer from Java parameter class (for structured 
types)
+                    Optional<Class<?>> conversionClassOpt = 
staticArg.getConversionClass();
+                    if (conversionClassOpt.isPresent()) {
+                        DataTypeFactory factory = createDataTypeFactory();
+                        dataType = 
factory.createDataType(conversionClassOpt.get());
+                    } else {
+                        throw new IllegalStateException(
+                                String.format(
+                                        "Table argument '%s' requires explicit 
type configuration. "
+                                                + "Use 
.withTableArgument(\"%s\", DataTypes.of(\"ROW<...>\")) "
+                                                + "to explicitly declare it.",
+                                        name, name));
+                    }
+                }
+            }
+
+            String[] partitionColumnNames = null;
+            if (primaryTrait == ArgumentTrait.SET_SEMANTIC_TABLE) {
+                boolean hasOptionalPartitionBy =
+                        
staticArg.getTraits().contains(StaticArgumentTrait.OPTIONAL_PARTITION_BY);
+                partitionColumnNames =
+                        extractAndValidatePartitionColumns(name, dataType, 
hasOptionalPartitionBy);
+            }
+
+            boolean hasPassColumnsThrough =
+                    
staticArg.getTraits().contains(StaticArgumentTrait.PASS_COLUMNS_THROUGH);
+
+            return new ArgumentInfo(
+                    name, dataType, primaryTrait, partitionColumnNames, 
hasPassColumnsThrough);
+        }
+
+        private ArgumentTrait extractPrimaryTrait(EnumSet<StaticArgumentTrait> 
staticTraits) {
+            if (staticTraits.contains(StaticArgumentTrait.SCALAR)) {
+                return ArgumentTrait.SCALAR;
+            }
+            if (staticTraits.contains(StaticArgumentTrait.ROW_SEMANTIC_TABLE)) 
{
+                return ArgumentTrait.ROW_SEMANTIC_TABLE;
+            }
+            if (staticTraits.contains(StaticArgumentTrait.SET_SEMANTIC_TABLE)) 
{
+                return ArgumentTrait.SET_SEMANTIC_TABLE;
+            }
+            return ArgumentTrait.SCALAR;
+        }
+
+        private String[] extractAndValidatePartitionColumns(
+                String name, DataType dataType, boolean isOptionalPartitionBy) 
{
+            PartitionConfiguration config = partitionConfigs.get(name);
+            if (config == null) {
+                if (isOptionalPartitionBy) {
+                    return null;
+                }
+                throw new IllegalStateException(
+                        String.format(
+                                "No partition configuration found for 
SET_SEMANTIC_TABLE argument '%s'. "
+                                        + "Use withPartitionBy(\"%s\", ...) to 
configure partitioning.",
+                                name, name));
+            }
+
+            List<String> fieldNames = getFieldNames(dataType);
+
+            for (String columnName : config.columnNames) {
+                if (!fieldNames.contains(columnName)) {
+                    throw new IllegalArgumentException(
+                            "Partition column '"
+                                    + columnName
+                                    + "' not found. "
+                                    + "Available columns: "
+                                    + fieldNames);
+                }
+            }
+            return config.columnNames;
+        }
+
+        /** Validates scalar argument values are configured and no unknown 
arguments exist. */
+        private void validateArgumentConfiguration(List<ArgumentInfo> 
arguments) {
+            for (ArgumentInfo arg : arguments) {
+                if (arg.isScalar && !scalarArgs.containsKey(arg.name)) {
+                    throw new IllegalStateException(
+                            String.format(
+                                    "Missing required scalar argument '%s'. "
+                                            + "Use .withScalarArgument(\"%s\", 
...)",
+                                    arg.name, arg.name));
+                }
+            }
+
+            Set<String> validNames = new HashSet<>();
+            for (ArgumentInfo arg : arguments) {
+                if (arg.name != null) {
+                    validNames.add(arg.name);
+                }
+            }
+
+            for (String configuredScalar : scalarArgs.keySet()) {
+                if (!validNames.contains(configuredScalar)) {
+                    throw new IllegalStateException(
+                            "Unknown scalar argument: '"
+                                    + configuredScalar
+                                    + "'. Not found in PTF signature.");
+                }
+            }
+
+            for (String configuredTable : tableArgs.keySet()) {
+                if (!validNames.contains(configuredTable)) {
+                    throw new IllegalStateException(
+                            "Unknown table argument: '"
+                                    + configuredTable
+                                    + "'. Not found in PTF signature.");
+                }
+            }
+        }
+
+        private ProcessTableFunction<OUT> instantiateFunction() throws 
IllegalArgumentException {
+            try {
+                return functionClass.getDeclaredConstructor().newInstance();
+            } catch (NoSuchMethodException e) {
+                throw new IllegalArgumentException(
+                        "PTF class must have a no-arg constructor: " + 
functionClass.getName(), e);
+            } catch (Exception e) {
+                throw new IllegalArgumentException(
+                        "Failed to instantiate PTF: " + 
functionClass.getName(), e);
+            }
+        }
+
+        /**
+         * Derives the output schema using SystemTypeInference, including 
field name deduplication.
+         */
+        private DataType deriveOutputTypeFromSystemInference(
+                ProcessTableFunction<OUT> function,
+                DataTypeFactory dataTypeFactory,
+                TypeInference systemTypeInference,
+                List<ArgumentInfo> arguments) {
+
+            List<DataType> argumentDataTypes = new ArrayList<>();
+            for (ArgumentInfo arg : arguments) {
+                argumentDataTypes.add(arg.dataType);
+            }
+
+            Map<Integer, TableSemantics> tableSemanticsMap = new HashMap<>();
+            for (int i = 0; i < arguments.size(); i++) {
+                ArgumentInfo arg = arguments.get(i);
+                if (arg.isTableArgument) {
+                    int[] partitionIndices = getPartitionColumnIndices(arg);
+                    TableSemantics semantics =
+                            new TestHarnessTableSemantics(arg.dataType, 
partitionIndices);
+                    tableSemanticsMap.put(i, semantics);
+                }
+            }
+
+            TestHarnessCallContext callContext = new TestHarnessCallContext();
+            callContext.typeFactory = dataTypeFactory;
+            callContext.argumentDataTypes = argumentDataTypes;
+            callContext.functionDefinition = function;
+            callContext.tableSemantics = tableSemanticsMap;
+            callContext.name = function.getClass().getSimpleName();
+
+            TypeStrategy outputStrategy = 
systemTypeInference.getOutputTypeStrategy();
+            Optional<DataType> outputTypeOpt = 
outputStrategy.inferType(callContext);
+
+            if (outputTypeOpt.isEmpty()) {
+                throw new IllegalStateException(
+                        "Failed to derive output type from SystemTypeInference 
for "
+                                + function.getClass().getSimpleName());
+            }
+
+            return outputTypeOpt.get();
+        }
+
+        private static List<String> extractFieldNames(DataType dataType) {
+            LogicalType logicalType = dataType.getLogicalType();
+            if (logicalType instanceof RowType) {
+                return ((RowType) logicalType).getFieldNames();
+            } else if (logicalType instanceof StructuredType) {
+                return ((StructuredType) logicalType)
+                        .getAttributes().stream()
+                                
.map(StructuredType.StructuredAttribute::getName)
+                                .collect(java.util.stream.Collectors.toList());
+            } else {
+                throw new IllegalStateException(
+                        "Expected RowType or StructuredType, got: "
+                                + logicalType.getClass().getSimpleName());
+            }
+        }
+
+        private int[] getPartitionColumnIndices(ArgumentInfo arg) {
+            if (arg.partitionColumnNames == null || 
arg.partitionColumnNames.length == 0) {
+                return new int[0];
+            }
+
+            List<String> fieldNames = extractFieldNames(arg.dataType);
+
+            int[] indices = new int[arg.partitionColumnNames.length];
+            for (int i = 0; i < arg.partitionColumnNames.length; i++) {
+                String colName = arg.partitionColumnNames[i];
+                int index = fieldNames.indexOf(colName);
+                if (index < 0) {
+                    throw new IllegalStateException(
+                            "Partition column '"
+                                    + colName
+                                    + "' not found in table argument. "
+                                    + "Available fields: "
+                                    + fieldNames);
+                }
+                indices[i] = index;
+            }
+
+            return indices;
+        }
+    }
+
+    private enum OutputPrependStrategy {
+        NONE,
+        PARTITION_KEYS,
+        ALL_COLUMNS
+    }
+
+    private void handleEvalInvocationException(
+            String contextMessage, Object[] args, InvocationTargetException e) 
throws Exception {
+        Throwable cause = e.getCause();
+        StringBuilder details = new StringBuilder();
+        details.append(contextMessage);
+        details.append("Expected parameter types: ");
+        details.append(Arrays.toString(evalMethod.getParameterTypes()));
+        details.append("\nActual arguments:\n");
+        for (int i = 0; i < arguments.size(); i++) {
+            ArgumentInfo arg = arguments.get(i);
+            Object value = args[i];
+            details.append(
+                    String.format(
+                            "  [%d] %s: %s (type: %s)\n",
+                            i,
+                            arg.name,
+                            value,
+                            value != null ? value.getClass().getName() : 
"null"));
+        }
+
+        if (cause instanceof Exception) {
+            Exception userException = (Exception) cause;
+            userException.addSuppressed(new Exception(details.toString()));
+            throw userException;
+        } else {
+            throw new RuntimeException(details.toString(), e);
+        }
+    }
+
+    /**
+     * Metadata for a single argument extracted from type inference.
+     *
+     * <p>Represents validated argument information combining PTF signature, 
type inference results,
+     * and builder configuration.
+     */
+    private static class ArgumentInfo {
+        final String name;
+        final DataType dataType;
+        final String[] partitionColumnNames;
+        final boolean isScalar;
+        final boolean isTableArgument;
+        final boolean isSetSemantic;
+        final OutputPrependStrategy prependStrategy;
+
+        ArgumentInfo(
+                String name,
+                DataType dataType,
+                ArgumentTrait primaryTrait,
+                String[] partitionColumnNames,
+                boolean hasPassColumnsThrough) {
+            this.name = name;
+            this.dataType = dataType;
+            this.partitionColumnNames = partitionColumnNames;
+            this.isScalar = (primaryTrait == ArgumentTrait.SCALAR);
+            this.isTableArgument = (primaryTrait != ArgumentTrait.SCALAR);
+            this.isSetSemantic = (primaryTrait == 
ArgumentTrait.SET_SEMANTIC_TABLE);
+            this.prependStrategy =
+                    hasPassColumnsThrough
+                            ? OutputPrependStrategy.ALL_COLUMNS
+                            : (this.isSetSemantic && partitionColumnNames != 
null)
+                                    ? OutputPrependStrategy.PARTITION_KEYS
+                                    : OutputPrependStrategy.NONE;
+        }
+    }
+
+    private static class TableArgumentConfiguration {
+        final AbstractDataType<?> explicitType;
+
+        TableArgumentConfiguration(AbstractDataType<?> explicitType) {
+            this.explicitType = explicitType;
+        }
+    }
+
+    private static class ScalarArgumentConfiguration {
+        final Object value;
+
+        ScalarArgumentConfiguration(Object value) {
+            this.value = value;
+        }
+    }
+
+    private static class PartitionConfiguration {
+        final String[] columnNames;
+
+        PartitionConfiguration(String[] columnNames) {
+            this.columnNames = columnNames;
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessCallContext.java
 
b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessCallContext.java
new file mode 100644
index 00000000000..62c99bc41a1
--- /dev/null
+++ 
b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessCallContext.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.ModelSemantics;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * {@link CallContext} implementation for {@link 
ProcessTableFunctionTestHarness} for use in
+ * deriving output schemas.
+ */
+@Internal
+class TestHarnessCallContext implements CallContext {
+    DataTypeFactory typeFactory;
+    List<DataType> argumentDataTypes;
+    FunctionDefinition functionDefinition;
+    Map<Integer, TableSemantics> tableSemantics;
+    String name;
+
+    @Override
+    public DataTypeFactory getDataTypeFactory() {
+        return typeFactory;
+    }
+
+    @Override
+    public FunctionDefinition getFunctionDefinition() {
+        return functionDefinition;
+    }
+
+    @Override
+    public boolean isArgumentLiteral(int pos) {
+        return false;
+    }
+
+    @Override
+    public boolean isArgumentNull(int pos) {
+        return false;
+    }
+
+    @Override
+    public <T> Optional<T> getArgumentValue(int pos, Class<T> clazz) {
+        return Optional.empty();
+    }
+
+    @Override
+    public String getName() {
+        return name;
+    }
+
+    @Override
+    public List<DataType> getArgumentDataTypes() {
+        return argumentDataTypes;
+    }
+
+    @Override
+    public Optional<DataType> getOutputDataType() {
+        return Optional.empty();
+    }
+
+    @Override
+    public boolean isGroupedAggregation() {
+        return false;
+    }
+
+    @Override
+    public Optional<TableSemantics> getTableSemantics(int pos) {
+        if (tableSemantics == null) {
+            return Optional.empty();
+        }
+        return Optional.ofNullable(tableSemantics.get(pos));
+    }
+
+    @Override
+    public Optional<ModelSemantics> getModelSemantics(int pos) {
+        return Optional.empty();
+    }
+}
diff --git 
a/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessDataTypeFactory.java
 
b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessDataTypeFactory.java
new file mode 100644
index 00000000000..af748db7ad2
--- /dev/null
+++ 
b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessDataTypeFactory.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.UnresolvedDataType;
+import org.apache.flink.table.types.extraction.DataTypeExtractor;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.table.types.utils.TypeInfoDataTypeConverter;
+
+/**
+ * {@link DataTypeFactory} implementation for {@link 
ProcessTableFunctionTestHarness}.
+ *
+ * <p>Only {@link #createDataType(AbstractDataType)}, {@link 
#createDataType(String)}, {@link
+ * #createDataType(Class)}, and {@link #createDataType(TypeInformation)} are 
supported.
+ */
+@Internal
+class TestHarnessDataTypeFactory implements DataTypeFactory {
+
+    @Override
+    public DataType createDataType(AbstractDataType<?> abstractDataType) {
+        if (abstractDataType instanceof DataType) {
+            return (DataType) abstractDataType;
+        } else if (abstractDataType instanceof UnresolvedDataType) {
+            return ((UnresolvedDataType) abstractDataType).toDataType(this);
+        }
+        throw new IllegalStateException();
+    }
+
+    @Override
+    public DataType createDataType(String typeString) {
+        LogicalType logicalType =
+                LogicalTypeParser.parse(typeString, 
Thread.currentThread().getContextClassLoader());
+        return 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType(
+                logicalType);
+    }
+
+    @Override
+    public DataType createDataType(UnresolvedIdentifier identifier) {
+        throw new UnsupportedOperationException(
+                "TestHarnessDataTypeFactory does not support 
createDataType(UnresolvedIdentifier).");
+    }
+
+    @Override
+    public <T> DataType createDataType(Class<T> clazz) {
+        return DataTypeExtractor.extractFromType(this, clazz);
+    }
+
+    @Override
+    public <T> DataType createDataType(TypeInformation<T> typeInfo) {
+        return TypeInfoDataTypeConverter.toDataType(this, typeInfo);
+    }
+
+    @Override
+    public <T> DataType createRawDataType(Class<T> clazz) {
+        throw new UnsupportedOperationException(
+                "TestHarnessDataTypeFactory does not support 
createRawDataType(Class).");
+    }
+
+    @Override
+    public <T> DataType createRawDataType(TypeInformation<T> typeInfo) {
+        throw new UnsupportedOperationException(
+                "TestHarnessDataTypeFactory does not support 
createRawDataType(TypeInformation).");
+    }
+
+    @Override
+    public LogicalType createLogicalType(String typeString) {
+        throw new UnsupportedOperationException(
+                "TestHarnessDataTypeFactory does not support 
createLogicalType(String).");
+    }
+
+    @Override
+    public LogicalType createLogicalType(UnresolvedIdentifier identifier) {
+        throw new UnsupportedOperationException(
+                "TestHarnessDataTypeFactory does not support 
createLogicalType(UnresolvedIdentifier).");
+    }
+}
diff --git 
a/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java
 
b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java
new file mode 100644
index 00000000000..fadf21d7dd9
--- /dev/null
+++ 
b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Optional;
+
+/** {@link TableSemantics} implementation for {@link 
ProcessTableFunctionTestHarness}. */
+@Internal
+class TestHarnessTableSemantics implements TableSemantics {
+    private final DataType dataType;
+    private final int[] partitionByColumns;
+
+    TestHarnessTableSemantics(DataType dataType, int[] partitionByColumns) {
+        this.dataType = dataType;
+        this.partitionByColumns = partitionByColumns;
+    }
+
+    @Override
+    public DataType dataType() {
+        return dataType;
+    }
+
+    @Override
+    public int[] partitionByColumns() {
+        return partitionByColumns;
+    }
+
+    @Override
+    public int[] orderByColumns() {
+        return new int[0];
+    }
+
+    @Override
+    public TableSemantics.SortDirection[] orderByDirections() {
+        return new TableSemantics.SortDirection[0];
+    }
+
+    @Override
+    public int timeColumn() {
+        return -1;
+    }
+
+    @Override
+    public Optional<ChangelogMode> changelogMode() {
+        return Optional.empty();
+    }
+}
diff --git 
a/flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarnessTest.java
 
b/flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarnessTest.java
new file mode 100644
index 00000000000..a02e9a8eefd
--- /dev/null
+++ 
b/flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarnessTest.java
@@ -0,0 +1,1065 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.ArgumentTrait;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.StateHint;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class ProcessTableFunctionTestHarnessTest {
+
+    @DataTypeHint("ROW<value INT>")
+    public static class PassthroughPTF extends ProcessTableFunction<Row> {
+        public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row 
input) {
+            collect(input);
+        }
+    }
+
+    /** Passthrough PTF for testing field ordering. */
+    @DataTypeHint("ROW<user STRING, value INT>")
+    public static class UserValuePassthroughPTF extends 
ProcessTableFunction<Row> {
+        public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row 
input) {
+            collect(input);
+        }
+    }
+
+    /** Filter PTF for testing scalar argument handling. */
+    @DataTypeHint("ROW<value INT>")
+    public static class FilterPTF extends ProcessTableFunction<Row> {
+        public void eval(
+                @ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input,
+                @ArgumentHint(ArgumentTrait.SCALAR) Integer threshold) {
+            // Use named field access - converter enriches Row with field names
+            int value = input.getFieldAs("value");
+            if (value >= threshold) {
+                collect(input);
+            }
+        }
+    }
+
+    /** PTF for testing transformation of output types. */
+    @DataTypeHint("ROW<doubled INT, original INT>")
+    public static class DoublePTF extends ProcessTableFunction<Row> {
+        public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row 
input) {
+            int value = input.getFieldAs("value");
+            collect(Row.of(value * 2, value));
+        }
+    }
+
+    /** PTF with for testing table argument names set via argument hints. */
+    @DataTypeHint("ROW<value INT>")
+    public static class ExplicitNamePTF extends ProcessTableFunction<Row> {
+        public void eval(
+                @ArgumentHint(value = ArgumentTrait.ROW_SEMANTIC_TABLE, name = 
"customName")
+                        Row actualParamName) {
+            collect(actualParamName);
+        }
+    }
+
+    /** PTF with inline type annotation - no builder config needed. */
+    @DataTypeHint("ROW<doubled INT>")
+    public static class InlineTypePTF extends ProcessTableFunction<Row> {
+        public void eval(
+                @ArgumentHint(
+                                value = ArgumentTrait.ROW_SEMANTIC_TABLE,
+                                type = @DataTypeHint("ROW<value INT>"))
+                        Row input) {
+            int value = input.getFieldAs("value");
+            collect(Row.of(value * 2));
+        }
+    }
+
+    @DataTypeHint("ROW<value INT>")
+    public static class PartitionedPTF extends ProcessTableFunction<Row> {
+        public void eval(@ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row 
input) {
+            collect(Row.of((Integer) input.getFieldAs("value")));
+        }
+    }
+
+    /**
+     * PTF with PASS_COLUMNS_THROUGH for validating that all input columns are 
prepended to output.
+     */
+    @DataTypeHint("ROW<doubled INT>")
+    public static class PassColumnsThroughPTF extends 
ProcessTableFunction<Row> {
+        public void eval(
+                @ArgumentHint({
+                            ArgumentTrait.SET_SEMANTIC_TABLE,
+                            ArgumentTrait.PASS_COLUMNS_THROUGH
+                        })
+                        Row input) {
+            int value = input.getFieldAs("value");
+            collect(Row.of(value * 2));
+        }
+    }
+
+    /** PTF with OPTIONAL_PARTITION_BY for validating that partition setup can 
be omitted. */
+    @DataTypeHint("ROW<doubled INT>")
+    public static class OptionalPartitionPTF extends ProcessTableFunction<Row> 
{
+
+        public void eval(
+                @ArgumentHint({
+                            ArgumentTrait.SET_SEMANTIC_TABLE,
+                            ArgumentTrait.OPTIONAL_PARTITION_BY
+                        })
+                        Row input) {
+            int value = input.getFieldAs("value");
+            collect(Row.of(value * 2));
+        }
+    }
+
+    /** Simple POJO for testing structured type input/output. */
+    public static class User {
+        public String name;
+        public int age;
+
+        public User() {}
+
+        public User(String name, int age) {
+            this.name = name;
+            this.age = age;
+        }
+
+        @Override
+        public String toString() {
+            return "User{name='" + name + "', age=" + age + '}';
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            User user = (User) o;
+            return age == user.age && java.util.Objects.equals(name, 
user.name);
+        }
+
+        @Override
+        public int hashCode() {
+            return java.util.Objects.hash(name, age);
+        }
+    }
+
+    /** PTF for testing structured type inputs. */
+    @DataTypeHint("ROW<name STRING, age INT>")
+    public static class UserPTF extends ProcessTableFunction<Row> {
+        public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) User 
user) {
+            if (user.age >= 18) {
+                collect(Row.of(user.name, user.age));
+            }
+        }
+    }
+
+    /** PTF that transforms structured type inputs and outputs. */
+    public static class UserTransformPTF extends ProcessTableFunction<User> {
+        public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) User 
user) {
+            User transformed = new User(user.name, user.age + 1);
+            collect(transformed);
+        }
+    }
+
+    /** Invalid PTF - uses reserved argument name "on_time". */
+    @DataTypeHint("ROW<value INT>")
+    public static class InvalidReservedArgOnTimePTF extends 
ProcessTableFunction<Row> {
+        public void eval(
+                @ArgumentHint(value = ArgumentTrait.ROW_SEMANTIC_TABLE, name = 
"on_time")
+                        Row input) {
+            collect(input);
+        }
+    }
+
+    /** Invalid PTF - uses reserved argument name "uid". */
+    @DataTypeHint("ROW<value INT>")
+    public static class InvalidReservedArgUidPTF extends 
ProcessTableFunction<Row> {
+        public void eval(
+                @ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input,
+                @ArgumentHint(ArgumentTrait.SCALAR) String uid) {
+            collect(input);
+        }
+    }
+
+    /** Multi-table PTF for validating multi-input processing. */
+    @DataTypeHint("ROW<output STRING>")
+    public static class MultiTableUnionPTF extends ProcessTableFunction<Row> {
+        public void eval(
+                @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row leftTable,
+                @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row 
rightTable) {
+            if (leftTable != null) {
+                collect(Row.of("LEFT: " + leftTable));
+            }
+            if (rightTable != null) {
+                collect(Row.of("RIGHT: " + rightTable));
+            }
+        }
+    }
+
+    /**
+     * Multi-table PTF with one POJO and one Row argument, for testing 
partitioning with structured
+     * types.
+     */
+    @DataTypeHint("ROW<source STRING, age INT>")
+    public static class MixedTypeMultiTablePTF extends 
ProcessTableFunction<Row> {
+        public void eval(
+                @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) User userTable,
+                @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row rowTable) {
+            if (userTable != null) {
+                collect(Row.of("USER", userTable.age));
+            }
+            if (rowTable != null) {
+                collect(Row.of("ROW", rowTable.getFieldAs("age")));
+            }
+        }
+    }
+
+    /**
+     * Invalid PTF - uses PASS_COLUMNS_THROUGH with multiple table arguments 
(not allowed per Flink
+     * docs).
+     */
+    @DataTypeHint("ROW<output STRING>")
+    public static class InvalidPassColumnsThroughMultiTablePTF extends 
ProcessTableFunction<Row> {
+        public void eval(
+                @ArgumentHint({
+                            ArgumentTrait.SET_SEMANTIC_TABLE,
+                            ArgumentTrait.PASS_COLUMNS_THROUGH
+                        })
+                        Row leftTable,
+                @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row 
rightTable) {
+            if (leftTable != null) {
+                collect(Row.of("LEFT: " + leftTable));
+            }
+            if (rightTable != null) {
+                collect(Row.of("RIGHT: " + rightTable));
+            }
+        }
+    }
+
+    /** PTF with only scalar arguments, no tables. */
+    @DataTypeHint("ROW<sum INT>")
+    public static class ScalarOnlyPTF extends ProcessTableFunction<Row> {
+        public void eval(
+                @ArgumentHint(ArgumentTrait.SCALAR) Integer a,
+                @ArgumentHint(ArgumentTrait.SCALAR) Integer b) {
+            collect(Row.of(a + b));
+        }
+    }
+
+    /** PTF with Context parameter - should be rejected by test harness. */
+    @DataTypeHint("ROW<value INT>")
+    public static class PTFWithContext extends ProcessTableFunction<Row> {
+        public void eval(Context ctx, 
@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input) {
+            collect(input);
+        }
+    }
+
+    /** PTF with State parameter - should be rejected by test harness. */
+    @DataTypeHint("ROW<value INT>")
+    public static class PTFWithState extends ProcessTableFunction<Row> {
+        public static class CountState {
+            public long counter = 0L;
+        }
+
+        public void eval(
+                @StateHint CountState state,
+                @ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input) {
+            collect(input);
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Builder Configuration Tests
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void testBuilderRejectsDuplicateScalarArguments() {
+        Exception exception =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> {
+                            
ProcessTableFunctionTestHarness.ofClass(FilterPTF.class)
+                                    .withTableArgument("input", 
DataTypes.of("ROW<value INT>"))
+                                    .withScalarArgument("threshold", 50)
+                                    .withScalarArgument("threshold", 100);
+                        });
+
+        assertThat(exception.getMessage()).contains("threshold");
+    }
+
+    @Test
+    void testBuilderRejectsDuplicateTableArguments() {
+        Exception exception =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> {
+                            
ProcessTableFunctionTestHarness.ofClass(MultiTableUnionPTF.class)
+                                    .withTableArgument(
+                                            "leftTable", DataTypes.of("ROW<id 
INT, name STRING>"))
+                                    .withTableArgument(
+                                            "leftTable", DataTypes.of("ROW<id 
INT, value INT>"));
+                        });
+
+        assertThat(exception.getMessage()).contains("leftTable");
+    }
+
+    @Test
+    void testBuilderRejectsMixedDuplicateArguments() {
+        Exception exception =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> {
+                            
ProcessTableFunctionTestHarness.ofClass(FilterPTF.class)
+                                    .withTableArgument("input", 
DataTypes.of("ROW<value INT>"))
+                                    .withScalarArgument("input", 42);
+                        });
+
+        assertThat(exception.getMessage()).contains("input");
+    }
+
+    @Test
+    void testBuilderRejectsReservedArgumentOnTime() {
+        // We should reject PTFs that use reserved argument name "on_time"
+        ProcessTableFunctionTestHarness.Builder harnessBuilder =
+                
ProcessTableFunctionTestHarness.ofClass(InvalidReservedArgOnTimePTF.class)
+                        .withTableArgument("on_time", DataTypes.of("ROW<id 
INT>"));
+
+        ValidationException exception =
+                assertThrows(
+                        ValidationException.class,
+                        () -> {
+                            harnessBuilder.build();
+                        });
+
+        assertThat(exception.getMessage())
+                .contains("Function signature must not declare system 
arguments")
+                .contains("on_time");
+    }
+
+    @Test
+    void testBuilderRejectsReservedArgumentUid() {
+        // We should reject PTFs that use reserved argument name "uid"
+        ProcessTableFunctionTestHarness.Builder harnessBuilder =
+                
ProcessTableFunctionTestHarness.ofClass(InvalidReservedArgUidPTF.class)
+                        .withTableArgument("input", DataTypes.of("ROW<id 
INT>"))
+                        .withScalarArgument("uid", "my-id");
+
+        ValidationException exception =
+                assertThrows(
+                        ValidationException.class,
+                        () -> {
+                            harnessBuilder.build();
+                        });
+
+        assertThat(exception.getMessage())
+                .contains("Function signature must not declare system 
arguments")
+                .contains("uid");
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Argument Configuration Tests
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void testExplicitNameTakesPrecedence() throws Exception {
+        // Verify that @ArgumentHint(name="customName") takes precedence over 
actual parameter
+        // name when processing elements and calling eval.
+
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(ExplicitNamePTF.class)
+                        .withTableArgument("customName", 
DataTypes.of("ROW<value INT>"))
+                        .build()) {
+
+            harness.processElement(Row.of(42));
+            harness.processElement(Row.of(100));
+
+            List<Row> output = harness.getOutput();
+            assertThat(output).hasSize(2);
+            assertThat(output.get(0).getField("value")).isEqualTo(42);
+            assertThat(output.get(1).getField("value")).isEqualTo(100);
+        }
+    }
+
+    @Test
+    void testScalarOnlyPTF() throws Exception {
+        // Test scalar-only PTF with no table arguments
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(ScalarOnlyPTF.class)
+                        .withScalarArgument("a", 10)
+                        .withScalarArgument("b", 20)
+                        .build()) {
+
+            harness.process();
+
+            List<Row> output = harness.getOutput();
+
+            assertThat(output).hasSize(1);
+            assertThat(output.get(0).getField("sum")).isEqualTo(30);
+        }
+    }
+
+    @Test
+    void testScalarOnlyPTFWithWrongArgumentTypes() {
+        Exception exception =
+                assertThrows(
+                        IllegalStateException.class,
+                        () -> {
+                            
ProcessTableFunctionTestHarness.ofClass(ScalarOnlyPTF.class)
+                                    .withScalarArgument("a", "not_an_integer")
+                                    .withScalarArgument("b", 20)
+                                    .build();
+                        });
+
+        assertThat(exception.getMessage()).contains("Type mismatch");
+        assertThat(exception.getMessage()).contains("java.lang.Integer");
+        assertThat(exception.getMessage()).contains("java.lang.String");
+    }
+
+    @Test
+    void testInvokeRejectsTableArguments() throws Exception {
+        // Verify that invoke() rejects PTFs with table arguments
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(FilterPTF.class)
+                        .withTableArgument("input", DataTypes.of("ROW<value 
INT>"))
+                        .withScalarArgument("threshold", 50)
+                        .build()) {
+
+            Exception exception =
+                    assertThrows(
+                            IllegalStateException.class,
+                            () -> {
+                                harness.process();
+                            });
+
+            assertThat(exception.getMessage()).contains("process() is only for 
scalar-only PTFs");
+        }
+    }
+
+    @Test
+    void testTableProcessingWithScalarArgument() throws Exception {
+        // Test a PTF that uses a scalar parameter
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(FilterPTF.class)
+                        .withTableArgument("input", DataTypes.of("ROW<value 
INT>"))
+                        .withScalarArgument("threshold", 50) // Scalar 
argument: threshold = 50
+                        .build()) {
+
+            harness.processElement(Row.of(25));
+            harness.processElement(Row.of(75));
+            harness.processElement(Row.of(50));
+            harness.processElement(Row.of(10));
+            harness.processElement(Row.of(100));
+
+            List<Row> output = harness.getOutput();
+
+            assertThat(output).hasSize(3);
+            assertThat(output.get(0).getField("value")).isEqualTo(75);
+            assertThat(output.get(1).getField("value")).isEqualTo(50);
+            assertThat(output.get(2).getField("value")).isEqualTo(100);
+        }
+    }
+
+    @Test
+    void testTableProcessingWithScalarArgumentWrongType() {
+        Exception exception =
+                assertThrows(
+                        IllegalStateException.class,
+                        () -> {
+                            
ProcessTableFunctionTestHarness.ofClass(FilterPTF.class)
+                                    .withTableArgument("input", 
DataTypes.of("ROW<value INT>"))
+                                    .withScalarArgument("threshold", 
"not_an_integer")
+                                    .build();
+                        });
+
+        assertThat(exception.getMessage()).contains("Type mismatch");
+        assertThat(exception.getMessage()).contains("java.lang.Integer");
+        assertThat(exception.getMessage()).contains("java.lang.String");
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Argument Trait Tests
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void testProcessElementWithRowKind() throws Exception {
+        // Verify RowKind is preserved through processing (ROW_SEMANTIC_TABLE)
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(PassthroughPTF.class)
+                        .withTableArgument("input", DataTypes.of("ROW<value 
INT>"))
+                        .build()) {
+
+            harness.processElement(RowKind.INSERT, 10);
+            harness.processElement(RowKind.UPDATE_BEFORE, 15);
+            harness.processElement(RowKind.UPDATE_AFTER, 20);
+            harness.processElement(RowKind.DELETE, 30);
+
+            List<Row> output = harness.getOutput();
+            assertThat(output).hasSize(4);
+            assertThat(output.get(0).getKind()).isEqualTo(RowKind.INSERT);
+            assertThat(output.get(0).getField("value")).isEqualTo(10);
+            
assertThat(output.get(1).getKind()).isEqualTo(RowKind.UPDATE_BEFORE);
+            assertThat(output.get(1).getField("value")).isEqualTo(15);
+            
assertThat(output.get(2).getKind()).isEqualTo(RowKind.UPDATE_AFTER);
+            assertThat(output.get(2).getField("value")).isEqualTo(20);
+            assertThat(output.get(3).getKind()).isEqualTo(RowKind.DELETE);
+            assertThat(output.get(3).getField("value")).isEqualTo(30);
+        }
+    }
+
+    @Test
+    void testPassColumnsThroughTrait() throws Exception {
+        // Verify PASS_COLUMNS_THROUGH prepends ALL input columns (not just 
partition keys)
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                
ProcessTableFunctionTestHarness.ofClass(PassColumnsThroughPTF.class)
+                        .withTableArgument("input", DataTypes.of("ROW<key 
STRING, value INT>"))
+                        .withPartitionBy("input", "key")
+                        .build()) {
+
+            harness.processElement(Row.of("A", 10));
+            harness.processElement(Row.of("B", 20));
+
+            List<Row> output = harness.getOutput();
+            assertThat(output).hasSize(2);
+
+            assertThat(output.get(0)).isEqualTo(Row.of("A", 10, 20));
+            assertThat(output.get(1)).isEqualTo(Row.of("B", 20, 40));
+        }
+    }
+
+    @Test
+    void testOptionalPartitionByWithoutPartition() throws Exception {
+        // Verify OPTIONAL_PARTITION_BY allows omitting partition configuration
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                
ProcessTableFunctionTestHarness.ofClass(OptionalPartitionPTF.class)
+                        .withTableArgument("input", DataTypes.of("ROW<key 
STRING, value INT>"))
+                        .build()) {
+
+            harness.processElement(Row.of("A", 10));
+            harness.processElement(Row.of("B", 20));
+            harness.processElement(Row.of("C", 30));
+
+            List<Row> output = harness.getOutput();
+            assertThat(output).hasSize(3);
+
+            assertThat(output.get(0)).isEqualTo(Row.of(20));
+            assertThat(output.get(1)).isEqualTo(Row.of(40));
+            assertThat(output.get(2)).isEqualTo(Row.of(60));
+        }
+    }
+
+    @Test
+    void testOptionalPartitionByWithPartition() throws Exception {
+        // Verify OPTIONAL_PARTITION_BY still works when partition is 
configured
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                
ProcessTableFunctionTestHarness.ofClass(OptionalPartitionPTF.class)
+                        .withTableArgument("input", DataTypes.of("ROW<key 
STRING, value INT>"))
+                        .withPartitionBy("input", "key")
+                        .build()) {
+
+            harness.processElement(Row.of("A", 10));
+            harness.processElement(Row.of("A", 5));
+            harness.processElement(Row.of("B", 20));
+
+            List<Row> output = harness.getOutput();
+            assertThat(output).hasSize(3);
+
+            assertThat(output.get(0)).isEqualTo(Row.of("A", 20));
+            assertThat(output.get(1)).isEqualTo(Row.of("A", 10));
+            assertThat(output.get(2)).isEqualTo(Row.of("B", 40));
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Data Type Conversion Tests
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void testNamedRowFieldOrdering() throws Exception {
+        // Test what happens when Row field order differs from DataType schema 
order
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                
ProcessTableFunctionTestHarness.ofClass(UserValuePassthroughPTF.class)
+                        .withTableArgument("input", DataTypes.of("ROW<user 
STRING, value INT>"))
+                        .build()) {
+
+            Row rowA = Row.withNames();
+            rowA.setField("value", 100);
+            rowA.setField("user", "Alice");
+
+            harness.processElement(rowA);
+
+            List<Row> output = harness.getOutput();
+            assertThat(output).hasSize(1);
+
+            Row result = output.get(0);
+
+            // Named field access
+            assertThat(result.getField("user")).isEqualTo("Alice");
+            assertThat(result.getField("value")).isEqualTo(100);
+        }
+    }
+
+    @Test
+    void testPositionalRowWithWrongTypeOrder() throws Exception {
+        // Verify that type mismatches are caught when Row values don't match 
schema types
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                
ProcessTableFunctionTestHarness.ofClass(UserValuePassthroughPTF.class)
+                        .withTableArgument("input", DataTypes.of("ROW<user 
STRING, value INT>"))
+                        .build()) {
+
+            Row wrongOrderRow = Row.of(10, "Alice");
+
+            Exception exception =
+                    assertThrows(
+                            ClassCastException.class, () -> 
harness.processElement(wrongOrderRow));
+
+            assertThat(exception.getMessage()).contains("Integer");
+            assertThat(exception.getMessage()).contains("String");
+            assertThat(exception.getMessage()).contains("cannot be cast");
+        }
+    }
+
+    @Test
+    void testStructuredTypeInput() throws Exception {
+        // Test that a PTF can declare an input type as a structured type,
+        // and that the harness can handle the conversion from Row into
+        // that type.
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(UserPTF.class)
+                        .withTableArgument("user")
+                        .build()) {
+
+            harness.processElement(Row.of("Alice", 25));
+            harness.processElement(Row.of("Bob", 17));
+
+            List<Row> output = harness.getOutput();
+            assertThat(output).hasSize(1);
+
+            Row result = output.get(0);
+            assertThat(result.getField("name")).isEqualTo("Alice");
+            assertThat(result.getField("age")).isEqualTo(25);
+        }
+    }
+
+    @Test
+    void testStructuredTypeInputAndOutput() throws Exception {
+        // Test PTF with structured type inputs and outputs
+        try (ProcessTableFunctionTestHarness<User> harness =
+                ProcessTableFunctionTestHarness.ofClass(UserTransformPTF.class)
+                        .withTableArgument("user")
+                        .build()) {
+
+            harness.processElement(Row.of("Alice", 25));
+
+            List<User> output = harness.getOutput();
+            assertThat(output).hasSize(1);
+
+            User result = output.get(0);
+            assertThat(result.getClass()).isEqualTo(User.class);
+            assertThat(result.name).isEqualTo("Alice");
+            assertThat(result.age).isEqualTo(26);
+        }
+    }
+
+    @Test
+    void testInlineTypeAnnotation() throws Exception {
+        // Verify that PTFs can declare table argument types via 
@ArgumentHint(type = ...)
+        // without needing .withTableArgument() configuration
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                
ProcessTableFunctionTestHarness.ofClass(InlineTypePTF.class).build()) {
+
+            harness.processElement(Row.of(5));
+            harness.processElement(Row.of(10));
+
+            List<Row> output = harness.getOutput();
+            assertThat(output).hasSize(2);
+            assertThat(output.get(0)).isEqualTo(Row.of(10));
+            assertThat(output.get(1)).isEqualTo(Row.of(20));
+        }
+    }
+
+    @Test
+    void testInlineTypeMatchesBuilderConfig() throws Exception {
+        // Verify that when both inline annotation and builder config are 
provided with matching
+        // types, the harness builds successfully
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(InlineTypePTF.class)
+                        .withTableArgument("input", DataTypes.of("ROW<value 
INT>"))
+                        .build()) {
+
+            harness.processElement(Row.of(7));
+
+            List<Row> output = harness.getOutput();
+            assertThat(output).hasSize(1);
+            assertThat(output.get(0)).isEqualTo(Row.of(14));
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Partitioning Tests
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void testSetSemanticWithPartitionByName() throws Exception {
+        // Verify set-semantic table with partition configuration by column 
name
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(PartitionedPTF.class)
+                        .withTableArgument("input", DataTypes.of("ROW<key 
STRING, value INT>"))
+                        .withPartitionBy("input", "key") // Partition by "key" 
column name
+                        .build()) {
+
+            harness.processElement(Row.of("X", 10));
+            harness.processElement(Row.of("Y", 20));
+            harness.processElement(Row.of("X", 30));
+
+            List<Row> output = harness.getOutput();
+            assertThat(output).hasSize(3);
+            assertThat(output.get(0)).isEqualTo(Row.of("X", 10));
+            assertThat(output.get(1)).isEqualTo(Row.of("Y", 20));
+            assertThat(output.get(2)).isEqualTo(Row.of("X", 30));
+        }
+    }
+
+    @Test
+    void testSetSemanticWithMultiplePartitionColumns() throws Exception {
+        // Verify composite partition key (multiple columns)
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(PartitionedPTF.class)
+                        .withTableArgument(
+                                "input",
+                                DataTypes.of("ROW<region STRING, country 
STRING, value INT>"))
+                        .withPartitionBy("input", "region", "country")
+                        .build()) {
+
+            harness.processElement(Row.of("EU", "DE", 100));
+            harness.processElement(Row.of("EU", "DE", 200));
+            harness.processElement(Row.of("EU", "FR", 300));
+            harness.processElement(Row.of("US", "NY", 400));
+
+            List<Row> output = harness.getOutput();
+            assertThat(output).hasSize(4);
+            assertThat(output.get(0)).isEqualTo(Row.of("EU", "DE", 100));
+            assertThat(output.get(1)).isEqualTo(Row.of("EU", "DE", 200));
+            assertThat(output.get(2)).isEqualTo(Row.of("EU", "FR", 300));
+            assertThat(output.get(3)).isEqualTo(Row.of("US", "NY", 400));
+        }
+    }
+
+    @Test
+    void testSetSemanticWithSelectivePartitioning() throws Exception {
+        // Verify that only partition columns are automatically included in 
output
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(PartitionedPTF.class)
+                        .withTableArgument(
+                                "input",
+                                DataTypes.of(
+                                        "ROW<id INT, region STRING, country 
STRING, city STRING, value INT>"))
+                        .withPartitionBy("input", "region")
+                        .build()) {
+
+            harness.processElement(Row.of(1, "EU", "DE", "Berlin", 100));
+            harness.processElement(Row.of(4, "US", "CA", "LA", 200));
+
+            List<Row> output = harness.getOutput();
+
+            assertThat(output.get(0)).isEqualTo(Row.of("EU", 100));
+            assertThat(output.get(1)).isEqualTo(Row.of("US", 200));
+        }
+    }
+
+    @Test
+    void testMultipleSetSemanticTablesWithMatchingPartitionKeys() throws 
Exception {
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                
ProcessTableFunctionTestHarness.ofClass(MultiTableUnionPTF.class)
+                        .withTableArgument("leftTable", DataTypes.of("ROW<name 
STRING, score INT>"))
+                        .withPartitionBy("leftTable", "name")
+                        .withTableArgument(
+                                "rightTable", DataTypes.of("ROW<name STRING, 
city STRING>"))
+                        .withPartitionBy("rightTable", "name")
+                        .build()) {
+
+            harness.processElementForTable("leftTable", Row.of("Alice", 100));
+            harness.processElementForTable("leftTable", Row.of("Bob", 200));
+
+            harness.processElementForTable("rightTable", Row.of("Alice", 
"Berlin"));
+            harness.processElementForTable("rightTable", Row.of("Bob", 
"London"));
+
+            List<Row> output = harness.getOutput();
+            assertThat(output).hasSize(4);
+
+            assertThat(output.get(0).getField("name")).isEqualTo("Alice");
+            assertThat(output.get(0).getField("name0")).isEqualTo("Alice");
+            assertThat(output.get(0).getField("output")).isEqualTo("LEFT: 
+I[Alice, 100]");
+
+            assertThat(output.get(1).getField("name")).isEqualTo("Bob");
+            assertThat(output.get(1).getField("name0")).isEqualTo("Bob");
+            assertThat(output.get(1).getField("output")).isEqualTo("LEFT: 
+I[Bob, 200]");
+
+            assertThat(output.get(2).getField("name")).isEqualTo("Alice");
+            assertThat(output.get(2).getField("name0")).isEqualTo("Alice");
+            assertThat(output.get(2).getField("output")).isEqualTo("RIGHT: 
+I[Alice, Berlin]");
+
+            assertThat(output.get(3).getField("name")).isEqualTo("Bob");
+            assertThat(output.get(3).getField("name0")).isEqualTo("Bob");
+            assertThat(output.get(3).getField("output")).isEqualTo("RIGHT: 
+I[Bob, London]");
+        }
+    }
+
+    @Test
+    void testMultipleSetSemanticTablesWithStructuredTypePartitioning() throws 
Exception {
+        // Verify that multi-table PTFs work when one argument is a structured 
type
+        // and another is a Row, both partitioned by the same field type
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                
ProcessTableFunctionTestHarness.ofClass(MixedTypeMultiTablePTF.class)
+                        .withTableArgument("userTable")
+                        .withPartitionBy("userTable", "age")
+                        .withTableArgument(
+                                "rowTable", DataTypes.of("ROW<name STRING, age 
INT NOT NULL>"))
+                        .withPartitionBy("rowTable", "age")
+                        .build()) {
+
+            harness.processElementForTable("userTable", Row.of("Alice", 25));
+            harness.processElementForTable("rowTable", Row.of("Bob", 30));
+
+            List<Row> output = harness.getOutput();
+            assertThat(output)
+                    .containsExactlyInAnyOrder(
+                            Row.of(25, 25, "USER", 25), Row.of(30, 30, "ROW", 
30));
+        }
+    }
+
+    @Test
+    void testMultipleSetSemanticTablesWithMismatchedPartitionTypes() {
+        // Verify that multi-table PTFs with inconsistent partition types are 
rejected
+        Exception exception =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> {
+                            
ProcessTableFunctionTestHarness.ofClass(MultiTableUnionPTF.class)
+                                    .withTableArgument(
+                                            "leftTable", DataTypes.of("ROW<id 
INT, name STRING>"))
+                                    .withPartitionBy("leftTable", "id")
+                                    .withTableArgument(
+                                            "rightTable",
+                                            DataTypes.of("ROW<key STRING, city 
STRING>"))
+                                    .withPartitionBy("rightTable", "key")
+                                    .build();
+                        });
+
+        assertThat(exception.getMessage()).contains("Inconsistent 
partitioning");
+    }
+
+    @Test
+    void testMultipleSetSemanticTablesWithMismatchedPartitionColumnCount() {
+        // Verify that multi-table PTFs with different partition column counts 
are rejected
+        Exception exception =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> {
+                            
ProcessTableFunctionTestHarness.ofClass(MultiTableUnionPTF.class)
+                                    .withTableArgument(
+                                            "leftTable",
+                                            DataTypes.of("ROW<id INT, region 
STRING, name STRING>"))
+                                    .withPartitionBy("leftTable", "id", 
"region")
+                                    .withTableArgument(
+                                            "rightTable", DataTypes.of("ROW<id 
INT, city STRING>"))
+                                    .withPartitionBy("rightTable", "id")
+                                    .build();
+                        });
+
+        assertThat(exception.getMessage()).contains("Inconsistent 
partitioning");
+    }
+
+    @Test
+    void testPassColumnsThroughWithMultipleTablesRejected() {
+        // Verify that PASS_COLUMNS_THROUGH is rejected when used with 
multiple table arguments
+        Exception exception =
+                assertThrows(
+                        org.apache.flink.table.api.ValidationException.class,
+                        () -> {
+                            ProcessTableFunctionTestHarness.ofClass(
+                                            
InvalidPassColumnsThroughMultiTablePTF.class)
+                                    .withTableArgument("leftTable", 
DataTypes.of("ROW<a INT>"))
+                                    .withTableArgument("rightTable", 
DataTypes.of("ROW<b INT>"))
+                                    .build();
+                        });
+
+        assertThat(exception.getMessage())
+                .contains("Pass-through columns")
+                .contains("multiple table arguments");
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Element Processing Tests
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void testProcessElementOnMultiTableThrows() throws Exception {
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                
ProcessTableFunctionTestHarness.ofClass(MultiTableUnionPTF.class)
+                        .withTableArgument("leftTable", DataTypes.of("ROW<id 
INT, name STRING>"))
+                        .withTableArgument("rightTable", DataTypes.of("ROW<id 
INT, value STRING>"))
+                        .withPartitionBy("leftTable", "id")
+                        .withPartitionBy("rightTable", "id")
+                        .build()) {
+
+            Exception exception =
+                    assertThrows(
+                            IllegalStateException.class,
+                            () -> harness.processElement(Row.of(1, "Alice")));
+            assertThat(exception.getMessage())
+                    .contains("multiple table arguments")
+                    .contains("processElementForTable");
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Output Collection Tests
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void testClearOutput() throws Exception {
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(PassthroughPTF.class)
+                        .withTableArgument("input", DataTypes.of("ROW<value 
INT>"))
+                        .build()) {
+
+            harness.processElement(Row.of(10));
+            harness.processElement(Row.of(20));
+            assertThat(harness.getOutput()).hasSize(2);
+
+            harness.clearOutput();
+            assertThat(harness.getOutput()).isEmpty();
+
+            harness.processElement(Row.of(30));
+            assertThat(harness.getOutput()).hasSize(1);
+        }
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Error Cases Tests
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void testProcessElementForTableWithInvalidName() throws Exception {
+        try (ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(PassthroughPTF.class)
+                        .withTableArgument("input", DataTypes.of("ROW<value 
INT>"))
+                        .build()) {
+
+            Exception exception =
+                    assertThrows(
+                            IllegalArgumentException.class,
+                            () -> 
harness.processElementForTable("nonexistent", Row.of(42)));
+            assertThat(exception.getMessage()).contains("nonexistent");
+        }
+    }
+
+    @Test
+    void testContextParameterRejected() {
+        Exception exception =
+                assertThrows(
+                        IllegalStateException.class,
+                        () ->
+                                
ProcessTableFunctionTestHarness.ofClass(PTFWithContext.class)
+                                        .withTableArgument("input", 
DataTypes.of("ROW<value INT>"))
+                                        .build());
+
+        assertThat(exception.getMessage())
+                .contains("does not yet support Context parameters")
+                .contains("Context parameter")
+                .contains("position 0");
+    }
+
+    @Test
+    void testStateParameterRejected() {
+        Exception exception =
+                assertThrows(
+                        IllegalStateException.class,
+                        () ->
+                                
ProcessTableFunctionTestHarness.ofClass(PTFWithState.class)
+                                        .withTableArgument("input", 
DataTypes.of("ROW<value INT>"))
+                                        .build());
+
+        assertThat(exception.getMessage())
+                .contains("does not yet support state parameters")
+                .contains("@StateHint parameter")
+                .contains("position 0");
+    }
+
+    @Test
+    void testSetSemanticMissingPartitionConfigThrows() {
+        Exception exception =
+                assertThrows(
+                        IllegalStateException.class,
+                        () -> {
+                            
ProcessTableFunctionTestHarness.ofClass(PartitionedPTF.class)
+                                    .withTableArgument(
+                                            "input", DataTypes.of("ROW<key 
STRING, value INT>"))
+                                    .build();
+                        });
+
+        assertThat(exception.getMessage()).contains("No partition 
configuration found");
+        assertThat(exception.getMessage()).contains("withPartitionBy");
+    }
+
+    @Test
+    void testPartitionByInvalidColumnName() {
+        Exception exception =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> {
+                            
ProcessTableFunctionTestHarness.ofClass(PartitionedPTF.class)
+                                    .withTableArgument(
+                                            "input", DataTypes.of("ROW<key 
STRING, value INT>"))
+                                    .withPartitionBy("input", "nonexistent")
+                                    .build();
+                        });
+
+        assertThat(exception.getMessage()).contains("not found");
+        assertThat(exception.getMessage()).contains("Available columns");
+    }
+
+    @Test
+    void testPartitionByDuplicateConfigThrows() {
+        Exception exception =
+                assertThrows(
+                        IllegalArgumentException.class,
+                        () -> {
+                            
ProcessTableFunctionTestHarness.ofClass(PartitionedPTF.class)
+                                    .withTableArgument(
+                                            "input", DataTypes.of("ROW<key 
STRING, value INT>"))
+                                    .withPartitionBy("input", "key") // First 
config
+                                    .withPartitionBy("input", "key"); // 
Duplicate - should fail
+                        });
+
+        assertThat(exception.getMessage()).contains("Partition config already 
exists");
+    }
+}
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index cec3742b46b..a72acfe1dd8 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -187,6 +187,13 @@ under the License.
                        <scope>test</scope>
                </dependency>
 
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-table-runtime</artifactId>
+                       <version>${project.version}</version>
+                       <scope>test</scope>
+               </dependency>
+
                <dependency>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>flink-table-runtime</artifactId>


Reply via email to