autophagy commented on code in PR #27928:
URL: https://github.com/apache/flink/pull/27928#discussion_r3217411510
##########
docs/content.zh/docs/dev/table/functions/ptfs.md:
##########
@@ -2034,3 +2034,336 @@ Limitations
PTFs are in an early stage. The following limitations apply:
- PTFs cannot run in batch mode.
- Broadcast state
+
+Testing Process Table Functions
+-------------------------------
+
+The `ProcessTableFunctionTestHarness` provides a lightweight unit testing
framework for Process Table
+Functions (PTFs). It is useful for unit testing and validating PTF business
logic, multi-table PTF
+behaviour and validating errors.
+
+For end-to-end integration testing with the full Flink planner and runtime,
use integration tests
+instead.
+
+{{< top >}}
+
+### Quick Start
+
+{{< tabs "quickstart" >}}
+{{< tab "Java" >}}
+```java
+import org.apache.flink.table.annotation.*;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.functions.ProcessTableFunction;
+import
org.apache.flink.table.runtime.functions.ProcessTableFunctionTestHarness;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+// PTF under test
+@DataTypeHint("ROW<doubled INT>")
+public class DoublePTF extends ProcessTableFunction<Row> {
+ public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input) {
+ int value = input.getFieldAs("value");
+ collect(Row.of(value * 2));
+ }
+}
+
+// Test
+@Test
+void testDoublePTF() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(DoublePTF.class)
+ .withTableArgument("input", DataTypes.of("ROW<value INT>"))
+ .build()) {
+
+ harness.processElement(Row.of(5));
+ harness.processElement(Row.of(10));
+
+ List<Row> output = harness.getOutput();
+ assertThat(output).hasSize(2);
+ assertThat(output.get(0)).isEqualTo(Row.of(10));
+ assertThat(output.get(1)).isEqualTo(Row.of(20));
+ }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}
+
+### Common Testing Scenarios
+
+#### Testing Row-Semantic Tables
+
+Use `.withTableArgument()` to configure the input table schema:
+
+{{< tabs "row-semantic" >}}
+{{< tab "Java" >}}
+```java
+public class PassthroughPTF extends ProcessTableFunction<Integer> {
+ public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input) {
+ collect(input.getFieldAs("value"));
+ }
+}
+
+@Test
+void testPassthrough() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PassthroughPTF.class)
+ .withTableArgument("input", DataTypes.of("ROW<value INT>"))
+ .build()) {
+
+ harness.processElement(Row.of(42));
+ harness.processElement(Row.of(100));
+
+ List<Row> output = harness.getOutput();
+ assertThat(output).containsExactly(42, 100);
+ }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### Testing Set-Semantic Tables with Partitioning
+
+For `SET_SEMANTIC_TABLE`, use `.withPartitionBy()` to configure partition
columns:
+
+{{< tabs "set-semantic" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<doubled INT>")
+public class PartitionedPTF extends ProcessTableFunction<Row> {
+ public void eval(@ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input) {
+ int value = input.getFieldAs("value");
+ collect(Row.of(value * 2));
+ }
+}
+
+@Test
+void testPartitionedPTF() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PartitionedPTF.class)
+ .withTableArgument("input", DataTypes.of("ROW<key STRING, value
INT>"))
+ .withPartitionBy("input", "key")
+ .build()) {
+
+ harness.processElement(Row.of("A", 10));
+ harness.processElement(Row.of("B", 20));
+
+ List<Row> output = harness.getOutput();
+ assertThat(output.get(0)).isEqualTo(Row.of("A", 20));
+ assertThat(output.get(1)).isEqualTo(Row.of("B", 40));
+ }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### Testing Multiple Table Arguments
+
+Use `processElementForTable()` to specify which table receives each row:
+
+{{< tabs "multi-table" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<output STRING>")
+public class JoinPTF extends ProcessTableFunction<Row> {
+ public void eval(
+ @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row left,
+ @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row right) {
+ if (left != null) {
+ collect(Row.of("LEFT: " + left));
+ }
+ if (right != null) {
+ collect(Row.of("RIGHT: " + right));
+ }
+ }
+}
+
+@Test
+void testMultiTable() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(JoinPTF.class)
+ .withTableArgument("left", DataTypes.of("ROW<id INT, name STRING>"))
+ .withPartitionBy("left", "id")
+ .withTableArgument("right", DataTypes.of("ROW<id INT, city STRING>"))
+ .withPartitionBy("right", "id")
+ .build()) {
+
+ // Use processElementForTable() to target specific tables
+ harness.processElementForTable("left", Row.of(1, "Alice"));
+ harness.processElementForTable("right", Row.of(1, "Berlin"));
+
+ List<Row> output = harness.getOutput();
+ assertThat(output.get(0)).isEqualTo(Row.of(1, null, "LEFT: +I[1, Alice]"));
+ assertThat(output.get(1)).isEqualTo(Row.of(null, 1, "RIGHT: +I[1,
Berlin]"));
+ }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### Testing with Scalar Arguments
+
+Use `.withScalarArgument()` to configure scalar parameter values:
+
+{{< tabs "scalar-args" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<value INT>")
+public class FilterPTF extends ProcessTableFunction<Row> {
+ public void eval(
+ @ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input,
+ int threshold) {
+ int value = input.getFieldAs("value");
+ if (value > threshold) {
+ collect(Row.of(value));
+ }
+ }
+}
+
+@Test
+void testFilter() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(FilterPTF.class)
+ .withTableArgument("input", DataTypes.of("ROW<value INT>"))
+ .withScalarArgument("threshold", 50) // Configure scalar value
+ .build()) {
+
+ harness.processElement(Row.of(30));
+ harness.processElement(Row.of(70));
+
+ List<Row> output = harness.getOutput();
+ assertThat(output).containsExactly(Row.of(70));
+ }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+**Scalar-Only PTFs**: For PTFs with only scalar arguments, use `process()` to
trigger evaluation:
+
+{{< tabs "scalar-only" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<sum INT>")
+public class AddPTF extends ProcessTableFunction<Row> {
+ public void eval(int a, int b) {
+ collect(Row.of(a + b));
+ }
+}
+
+@Test
+void testScalarOnly() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(AddPTF.class)
+ .withScalarArgument("a", 5)
+ .withScalarArgument("b", 7)
+ .build()) {
+
+ harness.process(); // Use process() instead of processElement()
+
+ List<Row> output = harness.getOutput();
+ assertThat(output).containsExactly(Row.of(12));
+ }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### Configuring Table Argument Types
+
+The harness supports two ways to specify table argument types:
+
+{{< tabs "type-config" >}}
+{{< tab "Java" >}}
+```java
+// Option 1: Inline type annotation using DataTypeHint
+@DataTypeHint("ROW<doubled INT>")
+public class InlineTypePTF extends ProcessTableFunction<Row> {
+ public void eval(
+ @ArgumentHint(
+ value = ArgumentTrait.ROW_SEMANTIC_TABLE,
+ type = @DataTypeHint("ROW<value INT>")
+ ) Row input) {
+ int value = input.getFieldAs("value");
+ collect(Row.of(value * 2));
+ }
+}
+
+@Test
+void testInlineType() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(InlineTypePTF.class).build()) {
+
+ harness.processElement(Row.of(5));
+ assertThat(harness.getOutput()).containsExactly(Row.of(10));
+ }
+}
+
+// Option 2: Builder configuration
+@Test
+void testBuilderType() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(DoublePTF.class)
+ .withTableArgument("input", DataTypes.of("ROW<value INT>"))
+ .build()) {
+
+ harness.processElement(Row.of(5));
+ assertThat(harness.getOutput()).containsExactly(Row.of(10));
+ }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+**Structured Types**: The harness supports structured POJO types in addition
to `Row`, both as PTF inputs
+and outputs:
+
+{{< tabs "pojo-types" >}}
+{{< tab "Java" >}}
+```java
+public static class Customer {
+ public String name;
+ public int age;
+}
+
+public class CustomerPTF extends ProcessTableFunction<Customer> {
+ public void eval(@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Customer c)
{
+ collect(c);
+ }
+}
+
+@Test
+void testPOJO() throws Exception {
+ try (ProcessTableFunctionTestHarness<Customer> harness =
+ ProcessTableFunctionTestHarness.ofClass(CustomerPTF.class)
+ .withTableArgument("input", DataTypes.of(Customer.class))
+ .build()) {
+
+ harness.processElement(Row.of("Alice", 30));
+
+ List<Customer> output = harness.getOutput();
+ assertThat(output.get(0).name).isEqualTo("Alice");
+ assertThat(output.get(0).age).isEqualTo(30);
+ }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}
+
+### Unsupported Features
Review Comment:
Oops, thank you for the pointer 😅 Yeah, a silly mistake from merging the
independent testing page into the general PTF page
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]