fhueske commented on code in PR #28212:
URL: https://github.com/apache/flink/pull/28212#discussion_r3274273333
##########
docs/content.zh/docs/dev/table/functions/ptfs.md:
##########
@@ -2275,6 +2275,110 @@ void testScalarOnly() throws Exception {
{{< /tab >}}
{{< /tabs >}}
+#### Testing with State
+
+The harness supports structured types, `ListView`, and `MapView`:
+
+{{< tabs "state-testing" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<name STRING, count BIGINT>")
+public class CounterPTF extends ProcessTableFunction<Row> {
+ public static class CountState {
+ public long count = 0L;
+ }
+
+ public void eval(
+ @StateHint CountState state,
+ @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input) {
+ state.count++;
+ String name = input.getFieldAs("name");
+ collect(Row.of(name, state.count));
+ }
+}
+
+@Test
+void testWithState() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(CounterPTF.class)
+ .withTableArgument("input", DataTypes.of("ROW<name STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build()) {
+
+ harness.processElement(Row.of("Alice", 10));
+ harness.processElement(Row.of("Alice", 20));
+
+ List<Row> output = harness.getOutput();
+ assertThat(output.get(0)).isEqualTo(Row.of("Alice", 1L));
+ assertThat(output.get(1)).isEqualTo(Row.of("Alice", 2L));
Review Comment:
Is this correct?
Shouldn't the framework automatically prepend the `PARTITION BY` column such
that the result would be `Row.of("Alice", "Alice", 1L)`?
##########
docs/content.zh/docs/dev/table/functions/ptfs.md:
##########
@@ -2275,6 +2275,110 @@ void testScalarOnly() throws Exception {
{{< /tab >}}
{{< /tabs >}}
+#### Testing with State
+
+The harness supports structured types, `ListView`, and `MapView`:
+
+{{< tabs "state-testing" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<name STRING, count BIGINT>")
+public class CounterPTF extends ProcessTableFunction<Row> {
+ public static class CountState {
+ public long count = 0L;
+ }
+
+ public void eval(
+ @StateHint CountState state,
+ @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input) {
+ state.count++;
+ String name = input.getFieldAs("name");
+ collect(Row.of(name, state.count));
+ }
+}
+
+@Test
+void testWithState() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(CounterPTF.class)
+ .withTableArgument("input", DataTypes.of("ROW<name STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build()) {
+
+ harness.processElement(Row.of("Alice", 10));
+ harness.processElement(Row.of("Alice", 20));
+
+ List<Row> output = harness.getOutput();
+ assertThat(output.get(0)).isEqualTo(Row.of("Alice", 1L));
+ assertThat(output.get(1)).isEqualTo(Row.of("Alice", 2L));
+ }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+**Initial State Setup**: Use `.withInitialStateArgument()` to pre-populate
state before processing:
+
+{{< tabs "initial-state" >}}
+{{< tab "Java" >}}
+```java
+@Test
+void testWithInitialState() throws Exception {
+ CounterPTF.CountState initialState = new CounterPTF.CountState();
+ initialState.count = 100L;
+
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(CounterPTF.class)
+ .withTableArgument("input", DataTypes.of("ROW<name STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .withInitialStateArgument("state", Row.of("Alice"), initialState)
Review Comment:
```suggestion
.withInitialStateArgument("state", Row.of("Alice"), initialState) //
initialize state for key "Alice"
```
explain that state must be set per key?
##########
docs/content.zh/docs/dev/table/functions/ptfs.md:
##########
@@ -2275,6 +2275,110 @@ void testScalarOnly() throws Exception {
{{< /tab >}}
{{< /tabs >}}
+#### Testing with State
+
+The harness supports structured types, `ListView`, and `MapView`:
+
+{{< tabs "state-testing" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<name STRING, count BIGINT>")
+public class CounterPTF extends ProcessTableFunction<Row> {
+ public static class CountState {
+ public long count = 0L;
+ }
+
+ public void eval(
+ @StateHint CountState state,
Review Comment:
nit
```suggestion
@StateHint CountState cntState,
```
to include the purpose in the name?
##########
docs/content.zh/docs/dev/table/functions/ptfs.md:
##########
@@ -2275,6 +2275,110 @@ void testScalarOnly() throws Exception {
{{< /tab >}}
{{< /tabs >}}
+#### Testing with State
+
+The harness supports structured types, `ListView`, and `MapView`:
+
+{{< tabs "state-testing" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<name STRING, count BIGINT>")
+public class CounterPTF extends ProcessTableFunction<Row> {
+ public static class CountState {
+ public long count = 0L;
+ }
+
+ public void eval(
+ @StateHint CountState state,
+ @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input) {
+ state.count++;
+ String name = input.getFieldAs("name");
+ collect(Row.of(name, state.count));
+ }
+}
+
+@Test
+void testWithState() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(CounterPTF.class)
+ .withTableArgument("input", DataTypes.of("ROW<name STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build()) {
+
+ harness.processElement(Row.of("Alice", 10));
+ harness.processElement(Row.of("Alice", 20));
+
+ List<Row> output = harness.getOutput();
+ assertThat(output.get(0)).isEqualTo(Row.of("Alice", 1L));
+ assertThat(output.get(1)).isEqualTo(Row.of("Alice", 2L));
+ }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+**Initial State Setup**: Use `.withInitialStateArgument()` to pre-populate
state before processing:
Review Comment:
```suggestion
**Initial State Setup**: Use `.withInitialStateArgument()` to pre-populate
state for a key before processing:
```
?
##########
docs/content.zh/docs/dev/table/functions/ptfs.md:
##########
@@ -2275,6 +2275,110 @@ void testScalarOnly() throws Exception {
{{< /tab >}}
{{< /tabs >}}
+#### Testing with State
+
+The harness supports structured types, `ListView`, and `MapView`:
+
+{{< tabs "state-testing" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<name STRING, count BIGINT>")
+public class CounterPTF extends ProcessTableFunction<Row> {
+ public static class CountState {
+ public long count = 0L;
+ }
+
+ public void eval(
+ @StateHint CountState state,
+ @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input) {
+ state.count++;
+ String name = input.getFieldAs("name");
+ collect(Row.of(name, state.count));
+ }
+}
+
+@Test
+void testWithState() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(CounterPTF.class)
+ .withTableArgument("input", DataTypes.of("ROW<name STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build()) {
+
+ harness.processElement(Row.of("Alice", 10));
+ harness.processElement(Row.of("Alice", 20));
+
+ List<Row> output = harness.getOutput();
+ assertThat(output.get(0)).isEqualTo(Row.of("Alice", 1L));
+ assertThat(output.get(1)).isEqualTo(Row.of("Alice", 2L));
+ }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+**Initial State Setup**: Use `.withInitialStateArgument()` to pre-populate
state before processing:
+
+{{< tabs "initial-state" >}}
+{{< tab "Java" >}}
+```java
+@Test
+void testWithInitialState() throws Exception {
+ CounterPTF.CountState initialState = new CounterPTF.CountState();
+ initialState.count = 100L;
+
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(CounterPTF.class)
+ .withTableArgument("input", DataTypes.of("ROW<name STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .withInitialStateArgument("state", Row.of("Alice"), initialState)
+ .build()) {
+
+ harness.processElement(Row.of("Alice", 10));
+
+ List<Row> output = harness.getOutput();
+ assertThat(output).containsExactly(Row.of("Alice", 101L));
Review Comment:
same question about output schema as above
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -270,43 +272,69 @@ public void clearOutput() {
output.clear();
}
- /**
- * Given a target table argument and a row to process, construct the right
set of arguments for
- * the PTF's eval function and attempt to invoke it.
- */
- private void invokeEval(ArgumentInfo activeTableArg, Row activeRow) throws
Exception {
- // Set collector context so it can prepend columns if needed
- collector.setContext(activeTableArg, activeRow);
+ /** Get state for a specific partition key (test introspection). */
+ public <T> T getStateForKey(String stateName, Row partitionKey) {
+ return stateManager.getStateForKey(stateName, partitionKey);
+ }
- Object[] args = new Object[arguments.size()];
+ /** Set state for a specific partition key (test introspection). */
+ public void setStateForKey(String stateName, Row partitionKey, Object
state) throws Exception {
+ stateManager.setInitialState(stateName, partitionKey, state);
+ }
- for (int i = 0; i < arguments.size(); i++) {
- ArgumentInfo arg = arguments.get(i);
+ /** Get all partition keys that have a specific state entry (test
introspection). */
+ public Set<Row> getStateKeys(String stateName) {
+ return stateManager.getStateKeys(stateName);
+ }
- if (arg.isTableArgument && arg.name.equals(activeTableArg.name)) {
- // If the argument is the active table argument, first convert
the input row
- // to an internal RowData type, and then convert the RowData
to type that the
- // argument expects. For Rows, this will structure the Row
based on the table
- // argument structure. Otherwise, for POJOs, it will pass the
expected POJO to eval.
+ /** Get all state values for a state name across all partitions. */
+ public <T> Map<Row, T> getAllState(String stateName) {
+ return stateManager.getAllState(stateName);
+ }
- ConverterPair pair = argumentConverters.get(arg.name);
+ /** Clear all state for a given partition. */
+ public void clearStateForPartition(Row partitionKey) {
+ stateManager.clearStateForPartition(partitionKey);
Review Comment:
```suggestion
public void clearAllStateForKey(Row partitionKey) {
stateManager.clearAllStateForKey(partitionKey);
```
for consistency with `get` and `set` methods.
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -270,43 +272,69 @@ public void clearOutput() {
output.clear();
}
- /**
- * Given a target table argument and a row to process, construct the right
set of arguments for
- * the PTF's eval function and attempt to invoke it.
- */
- private void invokeEval(ArgumentInfo activeTableArg, Row activeRow) throws
Exception {
- // Set collector context so it can prepend columns if needed
- collector.setContext(activeTableArg, activeRow);
+ /** Get state for a specific partition key (test introspection). */
+ public <T> T getStateForKey(String stateName, Row partitionKey) {
+ return stateManager.getStateForKey(stateName, partitionKey);
+ }
- Object[] args = new Object[arguments.size()];
+ /** Set state for a specific partition key (test introspection). */
+ public void setStateForKey(String stateName, Row partitionKey, Object
state) throws Exception {
+ stateManager.setInitialState(stateName, partitionKey, state);
Review Comment:
```suggestion
stateManager.setStateForKey(stateName, partitionKey, state);
```
the non-public state manager can use `setStateForKey()` for all write
accesses.
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -270,43 +272,69 @@ public void clearOutput() {
output.clear();
}
- /**
- * Given a target table argument and a row to process, construct the right
set of arguments for
- * the PTF's eval function and attempt to invoke it.
- */
- private void invokeEval(ArgumentInfo activeTableArg, Row activeRow) throws
Exception {
- // Set collector context so it can prepend columns if needed
- collector.setContext(activeTableArg, activeRow);
+ /** Get state for a specific partition key (test introspection). */
+ public <T> T getStateForKey(String stateName, Row partitionKey) {
Review Comment:
Some of the public state interaction methods are not mentioned in the
documentation (`set`, `clear`, ...)
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -568,6 +617,20 @@ public Builder<OUT> withScalarArgument(String
argumentName, Object value) {
return this;
}
+ /** Sets initial state for a state parameter. */
+ public Builder<OUT> withInitialStateArgument(
Review Comment:
```suggestion
public Builder<OUT> withInitialStateForKey(
```
(see other suggestions above)
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -821,22 +891,25 @@ private void validateEvalMethodSupported(Method
evalMethod, List<ArgumentInfo> a
* matching data types.
*/
private void validatePartitionConsistency(List<ArgumentInfo>
arguments) {
Review Comment:
The check for the state partition keys could be done in this method, if you
decide to add it.
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -568,6 +617,20 @@ public Builder<OUT> withScalarArgument(String
argumentName, Object value) {
return this;
}
+ /** Sets initial state for a state parameter. */
+ public Builder<OUT> withInitialStateArgument(
+ String stateName, Row partitionKey, Object state) {
+ checkNotNull(stateName, "stateName must not be null");
+ checkNotNull(partitionKey, "partitionKey must not be null");
+ checkNotNull(state, "state must not be null");
+
+ stateArgs
+ .computeIfAbsent(stateName, k -> new
StateArgumentConfiguration())
+ .initialValues
Review Comment:
Not sure if this is done later, but it might make sense to also collect all
partitionKeys that were used during the builder configuration and check on
`build()` if their types correspond to the `PARTITION BY` clauses.
Just an idea, might also be overkill and not worth the effort.
WDYT?
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ListViewStateConverter.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.dataview.ListView;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.types.logical.ArrayType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Converter for ListView state.
+ *
+ * <p>Converts between external ListView objects and internal ArrayData
representation.
+ */
+@Internal
+class ListViewStateConverter implements StateConverter {
Review Comment:
Not sure if it makes sense, but this class is doing a lot of casts and
accepts / returns only generic `Object` types.
Would it be possible to work more with generics for this class (and
StateConverter in general)?
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -940,17 +1016,100 @@ private List<ArgumentInfo>
extractAndValidateTypeInference(
if (isScalar || isTableArg) {
ArgumentInfo argInfo = buildArgumentInfo(staticArg);
- arguments.add(argInfo);
+ tableAndScalarArguments.add(argInfo);
} else {
throw new IllegalStateException(
"Unknown argument type for StaticArgument. "
+ "Expected SCALAR, ROW_SEMANTIC_TABLE, or
SET_SEMANTIC_TABLE trait.");
}
}
- validateArgumentConfiguration(arguments);
+ validateArgumentConfiguration(tableAndScalarArguments);
+
+ // Extract state arguments from TypeInference
+ List<StateArgumentInfo> stateArguments = new ArrayList<>();
+
+ Map<String, StateTypeStrategy> stateStrategies =
+ systemTypeInference.getStateTypeStrategies();
+
+ DataTypeFactory dataTypeFactory = createDataTypeFactory();
- return arguments;
+ List<TableArgumentInfo> tableArgs =
+ ArgumentInfo.filterTableArguments(tableAndScalarArguments);
+ List<DataType> argumentDataTypes = new ArrayList<>();
+ for (TableArgumentInfo tArg : tableArgs) {
+ argumentDataTypes.add(tArg.dataType);
+ }
+ Map<Integer, TableSemantics> tableSemanticsMap = new HashMap<>();
+ for (int i = 0; i < tableArgs.size(); i++) {
+ TableArgumentInfo tArg = tableArgs.get(i);
+ int[] partitionIndices = getPartitionColumnIndices(tArg);
+ tableSemanticsMap.put(
+ i, new TestHarnessTableSemantics(tArg.dataType,
partitionIndices));
+ }
+
+ TestHarnessCallContext callContext = new TestHarnessCallContext();
+ callContext.typeFactory = dataTypeFactory;
+ callContext.argumentDataTypes = argumentDataTypes;
+ callContext.tableSemantics = tableSemanticsMap;
+ callContext.functionDefinition = function;
+ callContext.name = function.getClass().getSimpleName();
+
+ for (Map.Entry<String, StateTypeStrategy> entry :
stateStrategies.entrySet()) {
+ String stateName = entry.getKey();
+ StateTypeStrategy strategy = entry.getValue();
+
+ Optional<DataType> dataTypeOpt =
strategy.inferType(callContext);
+ if (dataTypeOpt.isEmpty()) {
+ throw new IllegalStateException(
+ String.format(
+ "Could not infer data type for state
parameter '%s'",
+ stateName));
+ }
+ DataType stateDataType = dataTypeOpt.get();
+
+ Optional<Duration> ttlOpt =
strategy.getTimeToLive(callContext);
+ stateArguments.add(
+ new StateArgumentInfo(stateName, stateDataType,
ttlOpt.orElse(null)));
+ }
+
+ List<ArgumentInfo> allArguments = new ArrayList<>();
+ allArguments.addAll(stateArguments);
+ allArguments.addAll(tableAndScalarArguments);
+
+ return allArguments;
+ }
+
+ /** Creates appropriate StateConverter for the given state data type.
*/
+ private StateConverter createStateConverter(DataType stateDataType,
ClassLoader classLoader)
+ throws Exception {
+ LogicalType logicalType = stateDataType.getLogicalType();
+
+ if (logicalType instanceof ArrayType) {
+ ArrayType arrayType = (ArrayType) logicalType;
+ DataType elementType = stateDataType.getChildren().get(0);
+ DataStructureConverter<Object, Object> elementConverter =
+ DataStructureConverters.getConverter(elementType);
+ elementConverter.open(classLoader);
+ return new ListViewStateConverter(arrayType, elementConverter);
+ } else if (logicalType instanceof MapType) {
+ MapType mapType = (MapType) logicalType;
+ DataType keyType = stateDataType.getChildren().get(0);
+ DataType valueType = stateDataType.getChildren().get(1);
+ DataStructureConverter<Object, Object> keyConverter =
+ DataStructureConverters.getConverter(keyType);
+ DataStructureConverter<Object, Object> valueConverter =
+ DataStructureConverters.getConverter(valueType);
+ keyConverter.open(classLoader);
+ valueConverter.open(classLoader);
+ return new MapViewStateConverter(mapType, keyConverter,
valueConverter);
+ } else {
Review Comment:
is this case also including `Row` state as described in the third example
here:
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/functions/ptfs/#state
?
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/StateConverter.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Converter between external state representations (ListView, MapView &
structured types) and
Review Comment:
`Row` is supported as state type as well.
##########
flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarnessTest.java:
##########
@@ -1062,4 +1110,309 @@ void testPartitionByDuplicateConfigThrows() {
assertThat(exception.getMessage()).contains("Partition config already
exists");
}
+
+ //
-------------------------------------------------------------------------
+ // State Tests
+ //
-------------------------------------------------------------------------
+
+ @Test
+ void testPojoState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ assertThat(harness.getOutput()).containsExactly(Row.of("Alice", 1L));
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(1L);
+
+ harness.processElementForTable("input", Row.of("Alice", 15));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("Alice", 2L));
+
+ state = harness.getStateForKey("state", Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(2L);
+
+ harness.close();
+ }
+
+ @Test
+ void testPojoStatePartitionIsolation() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+
+ PTFWithPojoState.CounterState aliceState =
harness.getStateForKey("state", Row.of("Alice"));
+ PTFWithPojoState.CounterState bobState =
harness.getStateForKey("state", Row.of("Bob"));
+
+ assertThat(aliceState.counter).isEqualTo(2L);
+ assertThat(bobState.counter).isEqualTo(1L);
+
+ harness.close();
+ }
+
+ @Test
+ void testPojoStateWithInitialState() throws Exception {
+ PTFWithPojoState.CounterState initialState = new
PTFWithPojoState.CounterState();
+ initialState.counter = 100L;
+
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<id
INT>"))
+ .withPartitionBy("input", "id")
+ .withInitialStateArgument("state", Row.of(1),
initialState)
+ .build();
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of(1));
+ assertThat(state.counter).isEqualTo(100L);
+
+ harness.processElement(Row.of(1));
+ assertThat(harness.getOutput()).containsExactly(Row.of(1, 101L));
+
+ harness.processElement(Row.of(2));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of(2, 1L));
+
+ harness.close();
+ }
+
+ @Test
+ void testGetStateKeys() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+ harness.processElementForTable("input", Row.of("Charlie", 30));
+
+ java.util.Set<Row> keys = harness.getStateKeys("state");
+ assertThat(keys)
+ .containsExactlyInAnyOrder(Row.of("Alice"), Row.of("Bob"),
Row.of("Charlie"));
+
+ harness.close();
+ }
+
+ @Test
+ void testGetAllState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+
+ java.util.Map<Row, PTFWithPojoState.CounterState> allState =
harness.getAllState("state");
+
+ assertThat(allState).hasSize(2);
+ assertThat(allState.get(Row.of("Alice")).counter).isEqualTo(2L);
+ assertThat(allState.get(Row.of("Bob")).counter).isEqualTo(1L);
+
+ harness.close();
+ }
+
Review Comment:
Add a test for `harness.setStateForKey()`?
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessStateManager.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.types.Row;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * State manager for {@link ProcessTableFunctionTestHarness}.
+ *
+ * <p>Handles state storage, lifecycle, and conversion between external and
internal storage
+ * formats.
+ */
+@Internal
+class TestHarnessStateManager {
+
+ private final Map<Row, Map<String, Object>> stateByPartition = new
HashMap<>();
+ private final List<ProcessTableFunctionTestHarness.StateArgumentInfo>
stateArguments;
+ private final Map<String, StateConverter> stateConverters;
+
+ TestHarnessStateManager(
+ List<ProcessTableFunctionTestHarness.StateArgumentInfo>
stateArguments,
+ Map<String, StateConverter> stateConverters) {
+ this.stateArguments = stateArguments;
+ this.stateConverters = stateConverters;
+ }
+
+ /**
+ * Load state for a partition key. Creates new state instances if none
exist. Converts internal
+ * storage to external objects (POJOs, ListView, MapView).
+ */
+ Map<String, Object> loadStateForPartition(Row partitionKey) {
+ Map<String, Object> internalState =
+ stateByPartition.computeIfAbsent(partitionKey, k ->
createNewPartitionState());
+
+ Map<String, Object> externalState = new HashMap<>();
+ for (ProcessTableFunctionTestHarness.StateArgumentInfo stateArg :
stateArguments) {
+ Object internalData = internalState.get(stateArg.name);
+ Object external = convertToExternal(internalData, stateArg);
+ externalState.put(stateArg.name, external);
+ }
+ return externalState;
+ }
+
+ /**
+ * Update mutated state after eval() invocation. Converts external objects
to internal format.
+ */
+ void updateStateForPartition(Row partitionKey, Map<String, Object>
externalState)
+ throws Exception {
+ Map<String, Object> internalState = new HashMap<>();
+ for (ProcessTableFunctionTestHarness.StateArgumentInfo stateArg :
stateArguments) {
+ Object external = externalState.get(stateArg.name);
+ Object internalData = convertToInternal(external, stateArg);
+ internalState.put(stateArg.name, internalData);
+ }
+ stateByPartition.put(partitionKey, internalState);
+ }
+
+ /** Clear all state for a partition. */
+ void clearStateForPartition(Row partitionKey) {
+ stateByPartition.remove(partitionKey);
+ }
+
+ /** Clear specific state entry for a given partition, resetting it to its
default value. */
+ void clearStateEntry(Row partitionKey, String stateName) {
+ Map<String, Object> internalState = stateByPartition.get(partitionKey);
+ if (internalState != null) {
+ ProcessTableFunctionTestHarness.StateArgumentInfo stateArg =
+ findStateArgument(stateName);
+ internalState.put(stateName, createNewStateInternalData(stateArg));
+ }
+ }
+
+ /** Set initial state for a given partition. */
+ void setInitialState(String stateName, Row partitionKey, Object
externalState)
Review Comment:
```suggestion
void setStateForKey(String stateName, Row partitionKey, Object
externalState)
```
##########
flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarnessTest.java:
##########
@@ -1062,4 +1110,309 @@ void testPartitionByDuplicateConfigThrows() {
assertThat(exception.getMessage()).contains("Partition config already
exists");
}
+
+ //
-------------------------------------------------------------------------
+ // State Tests
+ //
-------------------------------------------------------------------------
+
+ @Test
+ void testPojoState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ assertThat(harness.getOutput()).containsExactly(Row.of("Alice", 1L));
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(1L);
+
+ harness.processElementForTable("input", Row.of("Alice", 15));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("Alice", 2L));
+
+ state = harness.getStateForKey("state", Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(2L);
+
+ harness.close();
+ }
+
+ @Test
+ void testPojoStatePartitionIsolation() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+
+ PTFWithPojoState.CounterState aliceState =
harness.getStateForKey("state", Row.of("Alice"));
+ PTFWithPojoState.CounterState bobState =
harness.getStateForKey("state", Row.of("Bob"));
+
+ assertThat(aliceState.counter).isEqualTo(2L);
+ assertThat(bobState.counter).isEqualTo(1L);
+
+ harness.close();
+ }
+
+ @Test
+ void testPojoStateWithInitialState() throws Exception {
+ PTFWithPojoState.CounterState initialState = new
PTFWithPojoState.CounterState();
+ initialState.counter = 100L;
+
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<id
INT>"))
+ .withPartitionBy("input", "id")
+ .withInitialStateArgument("state", Row.of(1),
initialState)
+ .build();
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of(1));
+ assertThat(state.counter).isEqualTo(100L);
+
+ harness.processElement(Row.of(1));
+ assertThat(harness.getOutput()).containsExactly(Row.of(1, 101L));
+
+ harness.processElement(Row.of(2));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of(2, 1L));
+
+ harness.close();
+ }
+
+ @Test
+ void testGetStateKeys() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+ harness.processElementForTable("input", Row.of("Charlie", 30));
+
+ java.util.Set<Row> keys = harness.getStateKeys("state");
+ assertThat(keys)
+ .containsExactlyInAnyOrder(Row.of("Alice"), Row.of("Bob"),
Row.of("Charlie"));
+
+ harness.close();
+ }
+
+ @Test
+ void testGetAllState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+
+ java.util.Map<Row, PTFWithPojoState.CounterState> allState =
harness.getAllState("state");
+
+ assertThat(allState).hasSize(2);
+ assertThat(allState.get(Row.of("Alice")).counter).isEqualTo(2L);
+ assertThat(allState.get(Row.of("Bob")).counter).isEqualTo(1L);
+
+ harness.close();
+ }
+
+ @Test
+ void testListViewState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(PTFWithListViewState.class)
+ .withTableArgument("input", DataTypes.of("ROW<key
STRING, value INT>"))
+ .withPartitionBy("input", "key")
+ .build();
+
+ harness.processElementForTable("input", Row.of("A", 1));
+ assertThat(harness.getOutput()).containsExactly(Row.of("A", new
Integer[] {1}));
+
+ harness.processElementForTable("input", Row.of("A", 2));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("A", new
Integer[] {1, 2}));
+
+ org.apache.flink.table.api.dataview.ListView<Integer> listState =
Review Comment:
```suggestion
ListView<Integer> listState =
```
can be removed?
##########
flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarnessTest.java:
##########
@@ -1062,4 +1110,309 @@ void testPartitionByDuplicateConfigThrows() {
assertThat(exception.getMessage()).contains("Partition config already
exists");
}
+
+ //
-------------------------------------------------------------------------
+ // State Tests
+ //
-------------------------------------------------------------------------
+
+ @Test
+ void testPojoState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ assertThat(harness.getOutput()).containsExactly(Row.of("Alice", 1L));
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(1L);
+
+ harness.processElementForTable("input", Row.of("Alice", 15));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("Alice", 2L));
+
+ state = harness.getStateForKey("state", Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(2L);
+
+ harness.close();
+ }
+
+ @Test
+ void testPojoStatePartitionIsolation() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+
+ PTFWithPojoState.CounterState aliceState =
harness.getStateForKey("state", Row.of("Alice"));
+ PTFWithPojoState.CounterState bobState =
harness.getStateForKey("state", Row.of("Bob"));
+
+ assertThat(aliceState.counter).isEqualTo(2L);
+ assertThat(bobState.counter).isEqualTo(1L);
+
+ harness.close();
+ }
+
+ @Test
+ void testPojoStateWithInitialState() throws Exception {
+ PTFWithPojoState.CounterState initialState = new
PTFWithPojoState.CounterState();
+ initialState.counter = 100L;
+
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<id
INT>"))
+ .withPartitionBy("input", "id")
+ .withInitialStateArgument("state", Row.of(1),
initialState)
+ .build();
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of(1));
+ assertThat(state.counter).isEqualTo(100L);
+
+ harness.processElement(Row.of(1));
+ assertThat(harness.getOutput()).containsExactly(Row.of(1, 101L));
+
+ harness.processElement(Row.of(2));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of(2, 1L));
+
+ harness.close();
+ }
+
+ @Test
+ void testGetStateKeys() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+ harness.processElementForTable("input", Row.of("Charlie", 30));
+
+ java.util.Set<Row> keys = harness.getStateKeys("state");
+ assertThat(keys)
+ .containsExactlyInAnyOrder(Row.of("Alice"), Row.of("Bob"),
Row.of("Charlie"));
+
+ harness.close();
+ }
+
+ @Test
+ void testGetAllState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+
+ java.util.Map<Row, PTFWithPojoState.CounterState> allState =
harness.getAllState("state");
+
+ assertThat(allState).hasSize(2);
+ assertThat(allState.get(Row.of("Alice")).counter).isEqualTo(2L);
+ assertThat(allState.get(Row.of("Bob")).counter).isEqualTo(1L);
+
+ harness.close();
+ }
+
+ @Test
+ void testListViewState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(PTFWithListViewState.class)
+ .withTableArgument("input", DataTypes.of("ROW<key
STRING, value INT>"))
+ .withPartitionBy("input", "key")
+ .build();
+
+ harness.processElementForTable("input", Row.of("A", 1));
+ assertThat(harness.getOutput()).containsExactly(Row.of("A", new
Integer[] {1}));
+
+ harness.processElementForTable("input", Row.of("A", 2));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("A", new
Integer[] {1, 2}));
+
+ org.apache.flink.table.api.dataview.ListView<Integer> listState =
+ harness.getStateForKey("listState", Row.of("A"));
+ assertThat(listState.get()).containsExactly(1, 2);
+
+ harness.close();
+ }
+
+ @Test
+ void testMapViewState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(PTFWithMapViewState.class)
+ .withTableArgument(
+ "input", DataTypes.of("ROW<partition STRING,
key STRING>"))
+ .withPartitionBy("input", "partition")
+ .build();
+
+ harness.processElementForTable("input", Row.of("P1", "foo"));
+ assertThat(harness.getOutput()).containsExactly(Row.of("P1", "foo",
1));
+
+ harness.processElementForTable("input", Row.of("P1", "foo"));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("P1", "foo",
2));
+
+ harness.processElementForTable("input", Row.of("P1", "bar"));
+
+ org.apache.flink.table.api.dataview.MapView<String, Integer> mapState =
+ harness.getStateForKey("mapState", Row.of("P1"));
+ assertThat(mapState.get("foo")).isEqualTo(2);
+ assertThat(mapState.get("bar")).isEqualTo(1);
+
+ harness.close();
+ }
+
+ @Test
+ void testEmptyState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of("Alice"));
+
+ assertThat(state).isNull();
+
+ harness.close();
+ }
+
+ @Test
+ void testClearStateForPartition() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(2L);
+
+ harness.clearStateForPartition(Row.of("Alice"));
+
+ state = harness.getStateForKey("state", Row.of("Alice"));
+ assertThat(state).isNull();
+
Review Comment:
add another `processElementForTable()` call and output assertion to check
that we can continue processing after clearing state?
##########
flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarnessTest.java:
##########
@@ -1062,4 +1110,309 @@ void testPartitionByDuplicateConfigThrows() {
assertThat(exception.getMessage()).contains("Partition config already
exists");
}
+
+ //
-------------------------------------------------------------------------
+ // State Tests
+ //
-------------------------------------------------------------------------
+
+ @Test
+ void testPojoState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ assertThat(harness.getOutput()).containsExactly(Row.of("Alice", 1L));
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(1L);
+
+ harness.processElementForTable("input", Row.of("Alice", 15));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("Alice", 2L));
+
+ state = harness.getStateForKey("state", Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(2L);
+
+ harness.close();
+ }
+
+ @Test
+ void testPojoStatePartitionIsolation() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+
+ PTFWithPojoState.CounterState aliceState =
harness.getStateForKey("state", Row.of("Alice"));
+ PTFWithPojoState.CounterState bobState =
harness.getStateForKey("state", Row.of("Bob"));
+
+ assertThat(aliceState.counter).isEqualTo(2L);
+ assertThat(bobState.counter).isEqualTo(1L);
+
+ harness.close();
+ }
+
+ @Test
+ void testPojoStateWithInitialState() throws Exception {
+ PTFWithPojoState.CounterState initialState = new
PTFWithPojoState.CounterState();
+ initialState.counter = 100L;
+
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<id
INT>"))
+ .withPartitionBy("input", "id")
+ .withInitialStateArgument("state", Row.of(1),
initialState)
+ .build();
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of(1));
+ assertThat(state.counter).isEqualTo(100L);
+
+ harness.processElement(Row.of(1));
+ assertThat(harness.getOutput()).containsExactly(Row.of(1, 101L));
+
+ harness.processElement(Row.of(2));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of(2, 1L));
+
+ harness.close();
+ }
+
+ @Test
+ void testGetStateKeys() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+ harness.processElementForTable("input", Row.of("Charlie", 30));
+
+ java.util.Set<Row> keys = harness.getStateKeys("state");
+ assertThat(keys)
+ .containsExactlyInAnyOrder(Row.of("Alice"), Row.of("Bob"),
Row.of("Charlie"));
+
+ harness.close();
+ }
+
+ @Test
+ void testGetAllState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+
+ java.util.Map<Row, PTFWithPojoState.CounterState> allState =
harness.getAllState("state");
+
+ assertThat(allState).hasSize(2);
+ assertThat(allState.get(Row.of("Alice")).counter).isEqualTo(2L);
+ assertThat(allState.get(Row.of("Bob")).counter).isEqualTo(1L);
+
+ harness.close();
+ }
+
+ @Test
+ void testListViewState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(PTFWithListViewState.class)
+ .withTableArgument("input", DataTypes.of("ROW<key
STRING, value INT>"))
+ .withPartitionBy("input", "key")
+ .build();
+
+ harness.processElementForTable("input", Row.of("A", 1));
+ assertThat(harness.getOutput()).containsExactly(Row.of("A", new
Integer[] {1}));
+
+ harness.processElementForTable("input", Row.of("A", 2));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("A", new
Integer[] {1, 2}));
+
+ org.apache.flink.table.api.dataview.ListView<Integer> listState =
+ harness.getStateForKey("listState", Row.of("A"));
+ assertThat(listState.get()).containsExactly(1, 2);
+
+ harness.close();
+ }
+
+ @Test
+ void testMapViewState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(PTFWithMapViewState.class)
+ .withTableArgument(
+ "input", DataTypes.of("ROW<partition STRING,
key STRING>"))
+ .withPartitionBy("input", "partition")
+ .build();
+
+ harness.processElementForTable("input", Row.of("P1", "foo"));
+ assertThat(harness.getOutput()).containsExactly(Row.of("P1", "foo",
1));
+
+ harness.processElementForTable("input", Row.of("P1", "foo"));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("P1", "foo",
2));
+
+ harness.processElementForTable("input", Row.of("P1", "bar"));
Review Comment:
Also add assertion for output of this `process` call?
##########
flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarnessTest.java:
##########
@@ -1062,4 +1110,309 @@ void testPartitionByDuplicateConfigThrows() {
assertThat(exception.getMessage()).contains("Partition config already
exists");
}
+
+ //
-------------------------------------------------------------------------
+ // State Tests
+ //
-------------------------------------------------------------------------
+
+ @Test
+ void testPojoState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ assertThat(harness.getOutput()).containsExactly(Row.of("Alice", 1L));
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(1L);
+
+ harness.processElementForTable("input", Row.of("Alice", 15));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("Alice", 2L));
+
+ state = harness.getStateForKey("state", Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(2L);
+
+ harness.close();
+ }
+
+ @Test
+ void testPojoStatePartitionIsolation() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+
+ PTFWithPojoState.CounterState aliceState =
harness.getStateForKey("state", Row.of("Alice"));
+ PTFWithPojoState.CounterState bobState =
harness.getStateForKey("state", Row.of("Bob"));
+
+ assertThat(aliceState.counter).isEqualTo(2L);
+ assertThat(bobState.counter).isEqualTo(1L);
+
+ harness.close();
+ }
+
+ @Test
+ void testPojoStateWithInitialState() throws Exception {
+ PTFWithPojoState.CounterState initialState = new
PTFWithPojoState.CounterState();
+ initialState.counter = 100L;
+
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<id
INT>"))
+ .withPartitionBy("input", "id")
+ .withInitialStateArgument("state", Row.of(1),
initialState)
+ .build();
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of(1));
+ assertThat(state.counter).isEqualTo(100L);
+
+ harness.processElement(Row.of(1));
+ assertThat(harness.getOutput()).containsExactly(Row.of(1, 101L));
+
+ harness.processElement(Row.of(2));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of(2, 1L));
+
+ harness.close();
+ }
+
+ @Test
+ void testGetStateKeys() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+ harness.processElementForTable("input", Row.of("Charlie", 30));
+
+ java.util.Set<Row> keys = harness.getStateKeys("state");
+ assertThat(keys)
+ .containsExactlyInAnyOrder(Row.of("Alice"), Row.of("Bob"),
Row.of("Charlie"));
+
+ harness.close();
+ }
+
+ @Test
+ void testGetAllState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+
+ java.util.Map<Row, PTFWithPojoState.CounterState> allState =
harness.getAllState("state");
+
+ assertThat(allState).hasSize(2);
+ assertThat(allState.get(Row.of("Alice")).counter).isEqualTo(2L);
+ assertThat(allState.get(Row.of("Bob")).counter).isEqualTo(1L);
+
+ harness.close();
+ }
+
+ @Test
+ void testListViewState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(PTFWithListViewState.class)
+ .withTableArgument("input", DataTypes.of("ROW<key
STRING, value INT>"))
+ .withPartitionBy("input", "key")
+ .build();
+
+ harness.processElementForTable("input", Row.of("A", 1));
+ assertThat(harness.getOutput()).containsExactly(Row.of("A", new
Integer[] {1}));
+
+ harness.processElementForTable("input", Row.of("A", 2));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("A", new
Integer[] {1, 2}));
+
+ org.apache.flink.table.api.dataview.ListView<Integer> listState =
+ harness.getStateForKey("listState", Row.of("A"));
+ assertThat(listState.get()).containsExactly(1, 2);
+
+ harness.close();
+ }
+
+ @Test
+ void testMapViewState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(PTFWithMapViewState.class)
+ .withTableArgument(
+ "input", DataTypes.of("ROW<partition STRING,
key STRING>"))
+ .withPartitionBy("input", "partition")
+ .build();
+
+ harness.processElementForTable("input", Row.of("P1", "foo"));
+ assertThat(harness.getOutput()).containsExactly(Row.of("P1", "foo",
1));
+
+ harness.processElementForTable("input", Row.of("P1", "foo"));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("P1", "foo",
2));
+
+ harness.processElementForTable("input", Row.of("P1", "bar"));
+
+ org.apache.flink.table.api.dataview.MapView<String, Integer> mapState =
+ harness.getStateForKey("mapState", Row.of("P1"));
+ assertThat(mapState.get("foo")).isEqualTo(2);
+ assertThat(mapState.get("bar")).isEqualTo(1);
+
+ harness.close();
+ }
+
+ @Test
+ void testEmptyState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of("Alice"));
+
+ assertThat(state).isNull();
+
+ harness.close();
+ }
+
+ @Test
+ void testClearStateForPartition() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(2L);
+
+ harness.clearStateForPartition(Row.of("Alice"));
+
+ state = harness.getStateForKey("state", Row.of("Alice"));
+ assertThat(state).isNull();
+
+ harness.close();
+ }
+
+ @Test
+ void testClearStateEntry() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(2L);
+
+ harness.clearStateEntry(Row.of("Alice"), "state");
+
+ state = harness.getStateForKey("state", Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(0L);
+
Review Comment:
add another processElementForTable() call and output assertion to check that
we can continue processing after clearing state?
##########
docs/content.zh/docs/dev/table/functions/ptfs.md:
##########
@@ -2275,6 +2275,110 @@ void testScalarOnly() throws Exception {
{{< /tab >}}
{{< /tabs >}}
+#### Testing with State
+
+The harness supports structured types, `ListView`, and `MapView`:
+
+{{< tabs "state-testing" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<name STRING, count BIGINT>")
+public class CounterPTF extends ProcessTableFunction<Row> {
+ public static class CountState {
+ public long count = 0L;
+ }
+
+ public void eval(
+ @StateHint CountState state,
+ @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input) {
+ state.count++;
+ String name = input.getFieldAs("name");
+ collect(Row.of(name, state.count));
+ }
+}
+
+@Test
+void testWithState() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(CounterPTF.class)
+ .withTableArgument("input", DataTypes.of("ROW<name STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build()) {
+
+ harness.processElement(Row.of("Alice", 10));
+ harness.processElement(Row.of("Alice", 20));
+
+ List<Row> output = harness.getOutput();
+ assertThat(output.get(0)).isEqualTo(Row.of("Alice", 1L));
+ assertThat(output.get(1)).isEqualTo(Row.of("Alice", 2L));
+ }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+**Initial State Setup**: Use `.withInitialStateArgument()` to pre-populate
state before processing:
Review Comment:
Might even make sense to change the method name to something like
`.withInitialStateArgumentForKey()` (or `withInitialStateArgForKey()`,
`withInitialStateForKey()`, ...)?
##########
docs/content.zh/docs/dev/table/functions/ptfs.md:
##########
@@ -2275,6 +2275,110 @@ void testScalarOnly() throws Exception {
{{< /tab >}}
{{< /tabs >}}
+#### Testing with State
+
+The harness supports structured types, `ListView`, and `MapView`:
+
+{{< tabs "state-testing" >}}
+{{< tab "Java" >}}
+```java
+@DataTypeHint("ROW<name STRING, count BIGINT>")
+public class CounterPTF extends ProcessTableFunction<Row> {
+ public static class CountState {
+ public long count = 0L;
+ }
+
+ public void eval(
+ @StateHint CountState state,
+ @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input) {
+ state.count++;
+ String name = input.getFieldAs("name");
+ collect(Row.of(name, state.count));
+ }
+}
+
+@Test
+void testWithState() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(CounterPTF.class)
+ .withTableArgument("input", DataTypes.of("ROW<name STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build()) {
+
+ harness.processElement(Row.of("Alice", 10));
+ harness.processElement(Row.of("Alice", 20));
+
+ List<Row> output = harness.getOutput();
+ assertThat(output.get(0)).isEqualTo(Row.of("Alice", 1L));
+ assertThat(output.get(1)).isEqualTo(Row.of("Alice", 2L));
+ }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+**Initial State Setup**: Use `.withInitialStateArgument()` to pre-populate
state before processing:
+
+{{< tabs "initial-state" >}}
+{{< tab "Java" >}}
+```java
+@Test
+void testWithInitialState() throws Exception {
+ CounterPTF.CountState initialState = new CounterPTF.CountState();
+ initialState.count = 100L;
+
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(CounterPTF.class)
+ .withTableArgument("input", DataTypes.of("ROW<name STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .withInitialStateArgument("state", Row.of("Alice"), initialState)
+ .build()) {
+
+ harness.processElement(Row.of("Alice", 10));
+
+ List<Row> output = harness.getOutput();
+ assertThat(output).containsExactly(Row.of("Alice", 101L));
+ }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+**State Introspection**: Use `getStateForKey()`, `getStateKeys()`, and
`getAllState()` to inspect state during tests:
+
+{{< tabs "state-introspection" >}}
+{{< tab "Java" >}}
+```java
+@Test
+void testStateIntrospection() throws Exception {
+ try (ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(CounterPTF.class)
+ .withTableArgument("input", DataTypes.of("ROW<name STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build()) {
+
+ harness.processElement(Row.of("Alice", 10));
+ harness.processElement(Row.of("Bob", 20));
+
+ // Check specific partition state
+ CounterPTF.CountState aliceState =
+ harness.getStateForKey("state", Row.of("Alice"));
+ assertThat(aliceState.count).isEqualTo(1L);
+
+ // Get all partition keys with state
+ Set<Row> keys = harness.getStateKeys("state");
+ assertThat(keys).containsExactlyInAnyOrder(Row.of("Alice"), Row.of("Bob"));
+
+ // Get all state across partitions
+ Map<Row, CounterPTF.CountState> allState =
+ harness.getAllState("state");
+ assertThat(allState.get(Row.of("Bob")).count).isEqualTo(1L);
+ }
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
Review Comment:
Also add docs for how to assert `ListState` and `MapState` with the harness?
Or is it identical to `ValueState`?
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -323,6 +351,19 @@ private void invokeEval(ArgumentInfo activeTableArg, Row
activeRow) throws Excep
}
}
+ private Row extractPartitionKey(TableArgumentInfo tableArg, Row row) {
+ if (tableArg.partitionColumnNames == null ||
tableArg.partitionColumnNames.length == 0) {
+ return Row.of();
+ }
+
+ Object[] keyValues = new Object[tableArg.partitionColumnNames.length];
+ for (int i = 0; i < tableArg.partitionColumnNames.length; i++) {
+ String colName = tableArg.partitionColumnNames[i];
+ keyValues[i] = row.getField(colName);
+ }
Review Comment:
```suggestion
Object[] keyValues = Arrays.stream(tableArg.partitionColumnNames)
.map(colName -> row.getField(colName))
.toArray();
```
?
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -270,43 +272,69 @@ public void clearOutput() {
output.clear();
}
- /**
- * Given a target table argument and a row to process, construct the right
set of arguments for
- * the PTF's eval function and attempt to invoke it.
- */
- private void invokeEval(ArgumentInfo activeTableArg, Row activeRow) throws
Exception {
- // Set collector context so it can prepend columns if needed
- collector.setContext(activeTableArg, activeRow);
+ /** Get state for a specific partition key (test introspection). */
+ public <T> T getStateForKey(String stateName, Row partitionKey) {
+ return stateManager.getStateForKey(stateName, partitionKey);
+ }
- Object[] args = new Object[arguments.size()];
+ /** Set state for a specific partition key (test introspection). */
+ public void setStateForKey(String stateName, Row partitionKey, Object
state) throws Exception {
+ stateManager.setInitialState(stateName, partitionKey, state);
+ }
- for (int i = 0; i < arguments.size(); i++) {
- ArgumentInfo arg = arguments.get(i);
+ /** Get all partition keys that have a specific state entry (test
introspection). */
+ public Set<Row> getStateKeys(String stateName) {
+ return stateManager.getStateKeys(stateName);
+ }
- if (arg.isTableArgument && arg.name.equals(activeTableArg.name)) {
- // If the argument is the active table argument, first convert
the input row
- // to an internal RowData type, and then convert the RowData
to type that the
- // argument expects. For Rows, this will structure the Row
based on the table
- // argument structure. Otherwise, for POJOs, it will pass the
expected POJO to eval.
+ /** Get all state values for a state name across all partitions. */
+ public <T> Map<Row, T> getAllState(String stateName) {
+ return stateManager.getAllState(stateName);
+ }
- ConverterPair pair = argumentConverters.get(arg.name);
+ /** Clear all state for a given partition. */
+ public void clearStateForPartition(Row partitionKey) {
+ stateManager.clearStateForPartition(partitionKey);
+ }
- args[i] =
pair.output.toExternalOrNull(pair.input.toInternalOrNull(activeRow));
+ /** Clear specific state entry for a given partition. */
+ public void clearStateEntry(Row partitionKey, String stateName) {
+ stateManager.clearStateEntry(partitionKey, stateName);
Review Comment:
```suggestion
public void clearStateForKey(String stateName, Row partitionKey) {
stateManager.clearStateForKey(stateName, partitionKey);
```
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -1252,31 +1399,75 @@ private void handleEvalInvocationException(
}
/**
- * Metadata for a single argument extracted from type inference.
+ * Base class for PTF eval() arguments.
*
* <p>Represents validated argument information combining PTF signature,
type inference results,
* and builder configuration.
*/
- private static class ArgumentInfo {
+ private abstract static class ArgumentInfo {
final String name;
final DataType dataType;
+
+ ArgumentInfo(String name, DataType dataType) {
+ this.name = name;
+ this.dataType = dataType;
+ }
+
+ static List<StateArgumentInfo> filterStateArguments(List<ArgumentInfo>
arguments) {
+ List<StateArgumentInfo> result = new ArrayList<>();
+ for (ArgumentInfo arg : arguments) {
+ if (arg instanceof StateArgumentInfo) {
+ result.add((StateArgumentInfo) arg);
+ }
+ }
+ return result;
+ }
+
+ static List<TableArgumentInfo> filterTableArguments(List<ArgumentInfo>
arguments) {
+ List<TableArgumentInfo> result = new ArrayList<>();
+ for (ArgumentInfo arg : arguments) {
+ if (arg instanceof TableArgumentInfo) {
+ result.add((TableArgumentInfo) arg);
+ }
+ }
+ return result;
+ }
+
+ static List<ScalarArgumentInfo>
filterScalarArguments(List<ArgumentInfo> arguments) {
+ List<ScalarArgumentInfo> result = new ArrayList<>();
+ for (ArgumentInfo arg : arguments) {
+ if (arg instanceof ScalarArgumentInfo) {
+ result.add((ScalarArgumentInfo) arg);
+ }
+ }
+ return result;
+ }
+ }
+
+ /** State parameter with TTL configuration. */
+ static class StateArgumentInfo extends ArgumentInfo {
+ final Duration ttl;
Review Comment:
not support yet.
Do you want to remove it for now?
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -1252,31 +1399,75 @@ private void handleEvalInvocationException(
}
/**
- * Metadata for a single argument extracted from type inference.
+ * Base class for PTF eval() arguments.
*
* <p>Represents validated argument information combining PTF signature,
type inference results,
* and builder configuration.
*/
- private static class ArgumentInfo {
+ private abstract static class ArgumentInfo {
final String name;
final DataType dataType;
+
+ ArgumentInfo(String name, DataType dataType) {
+ this.name = name;
+ this.dataType = dataType;
+ }
+
+ static List<StateArgumentInfo> filterStateArguments(List<ArgumentInfo>
arguments) {
+ List<StateArgumentInfo> result = new ArrayList<>();
+ for (ArgumentInfo arg : arguments) {
+ if (arg instanceof StateArgumentInfo) {
+ result.add((StateArgumentInfo) arg);
+ }
+ }
+ return result;
Review Comment:
a bit shorter
```suggestion
return arguments.stream()
.filter(arg -> arg instanceof StateArgumentInfo)
.map(arg -> (StateArgumentInfo) arg)
.toList();
```
Can be done for the other filterXArguments() method below as well.
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessStateManager.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.types.Row;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * State manager for {@link ProcessTableFunctionTestHarness}.
+ *
+ * <p>Handles state storage, lifecycle, and conversion between external and
internal storage
+ * formats.
+ */
+@Internal
+class TestHarnessStateManager {
+
+ private final Map<Row, Map<String, Object>> stateByPartition = new
HashMap<>();
+ private final List<ProcessTableFunctionTestHarness.StateArgumentInfo>
stateArguments;
+ private final Map<String, StateConverter> stateConverters;
+
+ TestHarnessStateManager(
+ List<ProcessTableFunctionTestHarness.StateArgumentInfo>
stateArguments,
+ Map<String, StateConverter> stateConverters) {
+ this.stateArguments = stateArguments;
+ this.stateConverters = stateConverters;
+ }
+
+ /**
+ * Load state for a partition key. Creates new state instances if none
exist. Converts internal
+ * storage to external objects (POJOs, ListView, MapView).
+ */
+ Map<String, Object> loadStateForPartition(Row partitionKey) {
+ Map<String, Object> internalState =
+ stateByPartition.computeIfAbsent(partitionKey, k ->
createNewPartitionState());
+
+ Map<String, Object> externalState = new HashMap<>();
+ for (ProcessTableFunctionTestHarness.StateArgumentInfo stateArg :
stateArguments) {
+ Object internalData = internalState.get(stateArg.name);
+ Object external = convertToExternal(internalData, stateArg);
+ externalState.put(stateArg.name, external);
+ }
+ return externalState;
+ }
+
+ /**
+ * Update mutated state after eval() invocation. Converts external objects
to internal format.
+ */
+ void updateStateForPartition(Row partitionKey, Map<String, Object>
externalState)
+ throws Exception {
+ Map<String, Object> internalState = new HashMap<>();
+ for (ProcessTableFunctionTestHarness.StateArgumentInfo stateArg :
stateArguments) {
+ Object external = externalState.get(stateArg.name);
+ Object internalData = convertToInternal(external, stateArg);
+ internalState.put(stateArg.name, internalData);
+ }
+ stateByPartition.put(partitionKey, internalState);
+ }
+
+ /** Clear all state for a partition. */
+ void clearStateForPartition(Row partitionKey) {
+ stateByPartition.remove(partitionKey);
+ }
+
+ /** Clear specific state entry for a given partition, resetting it to its
default value. */
+ void clearStateEntry(Row partitionKey, String stateName) {
+ Map<String, Object> internalState = stateByPartition.get(partitionKey);
+ if (internalState != null) {
+ ProcessTableFunctionTestHarness.StateArgumentInfo stateArg =
+ findStateArgument(stateName);
+ internalState.put(stateName, createNewStateInternalData(stateArg));
+ }
+ }
+
+ /** Set initial state for a given partition. */
+ void setInitialState(String stateName, Row partitionKey, Object
externalState)
+ throws Exception {
+ ProcessTableFunctionTestHarness.StateArgumentInfo stateArg =
findStateArgument(stateName);
+ Object internalData = convertToInternal(externalState, stateArg);
+
+ Map<String, Object> internalState =
+ stateByPartition.computeIfAbsent(partitionKey, k ->
createNewPartitionState());
+ internalState.put(stateName, internalData);
+ }
+
+ /** Get the state for given partition. */
+ @SuppressWarnings("unchecked")
+ <T> T getStateForKey(String stateName, Row partitionKey) {
+ Map<String, Object> internalState = stateByPartition.get(partitionKey);
+ if (internalState == null) {
+ return null;
+ }
+ Object internalData = internalState.get(stateName);
+ if (internalData == null) {
+ return null;
+ }
+ return (T) convertToExternal(internalData,
findStateArgument(stateName));
+ }
+
+ /** Get all partition keys that have a specific state entry. */
+ Set<Row> getStateKeys(String stateName) {
+ return stateByPartition.entrySet().stream()
+ .filter(entry -> entry.getValue().containsKey(stateName))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+ }
+
+ /** Get all state values for a state name across all partitions. */
+ @SuppressWarnings("unchecked")
+ <T> Map<Row, T> getAllState(String stateName) {
+ ProcessTableFunctionTestHarness.StateArgumentInfo stateArg =
findStateArgument(stateName);
+ Map<Row, T> result = new HashMap<>();
+ for (Map.Entry<Row, Map<String, Object>> entry :
stateByPartition.entrySet()) {
+ Object internalData = entry.getValue().get(stateName);
+ if (internalData != null) {
+ result.put(entry.getKey(), (T) convertToExternal(internalData,
stateArg));
+ }
+ }
+ return result;
+ }
+
+ private Map<String, Object> createNewPartitionState() {
Review Comment:
```suggestion
private Map<String, Object> createEmptyPartitionState() {
```
Just a suggestion
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessStateManager.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.types.Row;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * State manager for {@link ProcessTableFunctionTestHarness}.
+ *
+ * <p>Handles state storage, lifecycle, and conversion between external and
internal storage
+ * formats.
+ */
+@Internal
+class TestHarnessStateManager {
+
+ private final Map<Row, Map<String, Object>> stateByPartition = new
HashMap<>();
+ private final List<ProcessTableFunctionTestHarness.StateArgumentInfo>
stateArguments;
+ private final Map<String, StateConverter> stateConverters;
+
+ TestHarnessStateManager(
+ List<ProcessTableFunctionTestHarness.StateArgumentInfo>
stateArguments,
+ Map<String, StateConverter> stateConverters) {
+ this.stateArguments = stateArguments;
+ this.stateConverters = stateConverters;
+ }
+
+ /**
+ * Load state for a partition key. Creates new state instances if none
exist. Converts internal
+ * storage to external objects (POJOs, ListView, MapView).
+ */
+ Map<String, Object> loadStateForPartition(Row partitionKey) {
+ Map<String, Object> internalState =
+ stateByPartition.computeIfAbsent(partitionKey, k ->
createNewPartitionState());
+
+ Map<String, Object> externalState = new HashMap<>();
+ for (ProcessTableFunctionTestHarness.StateArgumentInfo stateArg :
stateArguments) {
+ Object internalData = internalState.get(stateArg.name);
+ Object external = convertToExternal(internalData, stateArg);
+ externalState.put(stateArg.name, external);
+ }
+ return externalState;
+ }
+
+ /**
+ * Update mutated state after eval() invocation. Converts external objects
to internal format.
+ */
+ void updateStateForPartition(Row partitionKey, Map<String, Object>
externalState)
+ throws Exception {
+ Map<String, Object> internalState = new HashMap<>();
+ for (ProcessTableFunctionTestHarness.StateArgumentInfo stateArg :
stateArguments) {
+ Object external = externalState.get(stateArg.name);
+ Object internalData = convertToInternal(external, stateArg);
+ internalState.put(stateArg.name, internalData);
+ }
+ stateByPartition.put(partitionKey, internalState);
+ }
+
+ /** Clear all state for a partition. */
+ void clearStateForPartition(Row partitionKey) {
+ stateByPartition.remove(partitionKey);
+ }
+
+ /** Clear specific state entry for a given partition, resetting it to its
default value. */
+ void clearStateEntry(Row partitionKey, String stateName) {
Review Comment:
```suggestion
void clearStateEntry(String stateName, Row partitionKey) {
```
swap arg order for consistency
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -618,21 +683,40 @@ public ProcessTableFunctionTestHarness<OUT> build()
throws Exception {
List<ArgumentInfo> arguments =
extractAndValidateTypeInference(function,
systemTypeInference);
- FunctionContext functionContext =
- new FunctionContext(null,
Thread.currentThread().getContextClassLoader(), null);
+ FunctionContext functionContext = new FunctionContext(null,
classLoader, null);
Method evalMethod = findEvalMethod();
validateEvalMethodSupported(evalMethod, arguments);
validatePartitionConsistency(arguments);
Review Comment:
Is there a check that PTFs with state require partitioned table args?
##########
flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarnessTest.java:
##########
@@ -282,17 +284,79 @@ public void eval(Context ctx,
@ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Ro
}
}
- /** PTF with State parameter - should be rejected by test harness. */
- @DataTypeHint("ROW<value INT>")
- public static class PTFWithState extends ProcessTableFunction<Row> {
- public static class CountState {
+ /** PTF with simple structured type state - counts rows per partition. */
+ @DataTypeHint("ROW<count BIGINT>")
+ public static class PTFWithPojoState extends ProcessTableFunction<Row> {
+ public static class CounterState {
public long counter = 0L;
}
public void eval(
- @StateHint CountState state,
- @ArgumentHint(ArgumentTrait.ROW_SEMANTIC_TABLE) Row input) {
- collect(input);
+ @StateHint CounterState state,
+ @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input) {
+ state.counter++;
+ collect(Row.of(state.counter));
+ }
+ }
+
+ /** PTF with ListView state - accumulates values in a list. */
+ @DataTypeHint("ROW<values ARRAY<INT>>")
+ public static class PTFWithListViewState extends ProcessTableFunction<Row>
{
+ public void eval(
+ @StateHint(type = @DataTypeHint("ARRAY<INT>"))
ListView<Integer> listState,
+ @ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input)
+ throws Exception {
+ Integer value = input.getFieldAs("value");
+ listState.add(value);
+
+ // Collect all values as an array
+ java.util.List<Integer> values = new java.util.ArrayList<>();
Review Comment:
```suggestion
List<Integer> values = new ArrayList<>();
```
are fully-qualified names really needed here?
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessStateManager.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.types.Row;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * State manager for {@link ProcessTableFunctionTestHarness}.
+ *
+ * <p>Handles state storage, lifecycle, and conversion between external and
internal storage
+ * formats.
+ */
+@Internal
+class TestHarnessStateManager {
+
+ private final Map<Row, Map<String, Object>> stateByPartition = new
HashMap<>();
+ private final List<ProcessTableFunctionTestHarness.StateArgumentInfo>
stateArguments;
+ private final Map<String, StateConverter> stateConverters;
+
+ TestHarnessStateManager(
+ List<ProcessTableFunctionTestHarness.StateArgumentInfo>
stateArguments,
+ Map<String, StateConverter> stateConverters) {
+ this.stateArguments = stateArguments;
+ this.stateConverters = stateConverters;
+ }
+
+ /**
+ * Load state for a partition key. Creates new state instances if none
exist. Converts internal
+ * storage to external objects (POJOs, ListView, MapView).
+ */
+ Map<String, Object> loadStateForPartition(Row partitionKey) {
+ Map<String, Object> internalState =
+ stateByPartition.computeIfAbsent(partitionKey, k ->
createNewPartitionState());
+
+ Map<String, Object> externalState = new HashMap<>();
+ for (ProcessTableFunctionTestHarness.StateArgumentInfo stateArg :
stateArguments) {
+ Object internalData = internalState.get(stateArg.name);
+ Object external = convertToExternal(internalData, stateArg);
+ externalState.put(stateArg.name, external);
+ }
+ return externalState;
+ }
+
+ /**
+ * Update mutated state after eval() invocation. Converts external objects
to internal format.
+ */
+ void updateStateForPartition(Row partitionKey, Map<String, Object>
externalState)
+ throws Exception {
+ Map<String, Object> internalState = new HashMap<>();
+ for (ProcessTableFunctionTestHarness.StateArgumentInfo stateArg :
stateArguments) {
+ Object external = externalState.get(stateArg.name);
+ Object internalData = convertToInternal(external, stateArg);
+ internalState.put(stateArg.name, internalData);
+ }
+ stateByPartition.put(partitionKey, internalState);
+ }
+
+ /** Clear all state for a partition. */
+ void clearStateForPartition(Row partitionKey) {
+ stateByPartition.remove(partitionKey);
+ }
+
+ /** Clear specific state entry for a given partition, resetting it to its
default value. */
+ void clearStateEntry(Row partitionKey, String stateName) {
+ Map<String, Object> internalState = stateByPartition.get(partitionKey);
+ if (internalState != null) {
+ ProcessTableFunctionTestHarness.StateArgumentInfo stateArg =
+ findStateArgument(stateName);
+ internalState.put(stateName, createNewStateInternalData(stateArg));
+ }
+ }
+
+ /** Set initial state for a given partition. */
+ void setInitialState(String stateName, Row partitionKey, Object
externalState)
+ throws Exception {
+ ProcessTableFunctionTestHarness.StateArgumentInfo stateArg =
findStateArgument(stateName);
+ Object internalData = convertToInternal(externalState, stateArg);
+
+ Map<String, Object> internalState =
+ stateByPartition.computeIfAbsent(partitionKey, k ->
createNewPartitionState());
+ internalState.put(stateName, internalData);
+ }
+
+ /** Get the state for given partition. */
+ @SuppressWarnings("unchecked")
+ <T> T getStateForKey(String stateName, Row partitionKey) {
+ Map<String, Object> internalState = stateByPartition.get(partitionKey);
+ if (internalState == null) {
+ return null;
+ }
+ Object internalData = internalState.get(stateName);
+ if (internalData == null) {
+ return null;
+ }
+ return (T) convertToExternal(internalData,
findStateArgument(stateName));
+ }
+
+ /** Get all partition keys that have a specific state entry. */
+ Set<Row> getStateKeys(String stateName) {
Review Comment:
```suggestion
Set<Row> getPartitionKeysForState(String stateName) {
```
?
The method names in this class seem a bit inconsistent (key vs. partition,
access of a single state vs all states of a partition, etc.).
I know this is just an internal class that's not part of the public API but
I think it's always a good idea to strive for consistent naming.
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessStateManager.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.types.Row;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * State manager for {@link ProcessTableFunctionTestHarness}.
+ *
+ * <p>Handles state storage, lifecycle, and conversion between external and
internal storage
+ * formats.
+ */
+@Internal
+class TestHarnessStateManager {
Review Comment:
Should the TestHarnessStateManager maybe check that all provided partition
keys are valid (according to the `partition by` info of the harness?
##########
flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarnessTest.java:
##########
@@ -1062,4 +1110,309 @@ void testPartitionByDuplicateConfigThrows() {
assertThat(exception.getMessage()).contains("Partition config already
exists");
}
+
+ //
-------------------------------------------------------------------------
+ // State Tests
+ //
-------------------------------------------------------------------------
+
+ @Test
+ void testPojoState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ assertThat(harness.getOutput()).containsExactly(Row.of("Alice", 1L));
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(1L);
+
+ harness.processElementForTable("input", Row.of("Alice", 15));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("Alice", 2L));
+
+ state = harness.getStateForKey("state", Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(2L);
+
+ harness.close();
+ }
+
+ @Test
+ void testPojoStatePartitionIsolation() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+
+ PTFWithPojoState.CounterState aliceState =
harness.getStateForKey("state", Row.of("Alice"));
+ PTFWithPojoState.CounterState bobState =
harness.getStateForKey("state", Row.of("Bob"));
+
+ assertThat(aliceState.counter).isEqualTo(2L);
+ assertThat(bobState.counter).isEqualTo(1L);
+
+ harness.close();
+ }
+
+ @Test
+ void testPojoStateWithInitialState() throws Exception {
+ PTFWithPojoState.CounterState initialState = new
PTFWithPojoState.CounterState();
+ initialState.counter = 100L;
+
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<id
INT>"))
+ .withPartitionBy("input", "id")
+ .withInitialStateArgument("state", Row.of(1),
initialState)
+ .build();
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of(1));
+ assertThat(state.counter).isEqualTo(100L);
+
+ harness.processElement(Row.of(1));
+ assertThat(harness.getOutput()).containsExactly(Row.of(1, 101L));
+
+ harness.processElement(Row.of(2));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of(2, 1L));
+
+ harness.close();
+ }
+
+ @Test
+ void testGetStateKeys() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+ harness.processElementForTable("input", Row.of("Charlie", 30));
+
+ java.util.Set<Row> keys = harness.getStateKeys("state");
+ assertThat(keys)
+ .containsExactlyInAnyOrder(Row.of("Alice"), Row.of("Bob"),
Row.of("Charlie"));
+
+ harness.close();
+ }
+
+ @Test
+ void testGetAllState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+ harness.processElementForTable("input", Row.of("Bob", 20));
+
+ java.util.Map<Row, PTFWithPojoState.CounterState> allState =
harness.getAllState("state");
+
+ assertThat(allState).hasSize(2);
+ assertThat(allState.get(Row.of("Alice")).counter).isEqualTo(2L);
+ assertThat(allState.get(Row.of("Bob")).counter).isEqualTo(1L);
+
+ harness.close();
+ }
+
+ @Test
+ void testListViewState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(PTFWithListViewState.class)
+ .withTableArgument("input", DataTypes.of("ROW<key
STRING, value INT>"))
+ .withPartitionBy("input", "key")
+ .build();
+
+ harness.processElementForTable("input", Row.of("A", 1));
+ assertThat(harness.getOutput()).containsExactly(Row.of("A", new
Integer[] {1}));
+
+ harness.processElementForTable("input", Row.of("A", 2));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("A", new
Integer[] {1, 2}));
+
+ org.apache.flink.table.api.dataview.ListView<Integer> listState =
+ harness.getStateForKey("listState", Row.of("A"));
+ assertThat(listState.get()).containsExactly(1, 2);
+
+ harness.close();
+ }
+
+ @Test
+ void testMapViewState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(PTFWithMapViewState.class)
+ .withTableArgument(
+ "input", DataTypes.of("ROW<partition STRING,
key STRING>"))
+ .withPartitionBy("input", "partition")
+ .build();
+
+ harness.processElementForTable("input", Row.of("P1", "foo"));
+ assertThat(harness.getOutput()).containsExactly(Row.of("P1", "foo",
1));
+
+ harness.processElementForTable("input", Row.of("P1", "foo"));
+ assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("P1", "foo",
2));
+
+ harness.processElementForTable("input", Row.of("P1", "bar"));
+
+ org.apache.flink.table.api.dataview.MapView<String, Integer> mapState =
+ harness.getStateForKey("mapState", Row.of("P1"));
+ assertThat(mapState.get("foo")).isEqualTo(2);
+ assertThat(mapState.get("bar")).isEqualTo(1);
+
+ harness.close();
+ }
+
+ @Test
+ void testEmptyState() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of("Alice"));
+
+ assertThat(state).isNull();
+
+ harness.close();
+ }
+
+ @Test
+ void testClearStateForPartition() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(2L);
+
+ harness.clearStateForPartition(Row.of("Alice"));
+
+ state = harness.getStateForKey("state", Row.of("Alice"));
+ assertThat(state).isNull();
+
+ harness.close();
+ }
+
+ @Test
+ void testClearStateEntry() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+ ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+ .withTableArgument("input", DataTypes.of("ROW<name
STRING, value INT>"))
+ .withPartitionBy("input", "name")
+ .build();
+
+ harness.processElementForTable("input", Row.of("Alice", 10));
+ harness.processElementForTable("input", Row.of("Alice", 15));
+
+ PTFWithPojoState.CounterState state = harness.getStateForKey("state",
Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(2L);
+
+ harness.clearStateEntry(Row.of("Alice"), "state");
+
+ state = harness.getStateForKey("state", Row.of("Alice"));
+ assertThat(state.counter).isEqualTo(0L);
+
+ harness.close();
+ }
+
+ @Test
+ void testMultipleStateParameters() throws Exception {
+ ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(PTFWithMultipleStates.class)
+ .withTableArgument("input", DataTypes.of("ROW<key
STRING, value INT>"))
+ .withPartitionBy("input", "key")
+ .build();
+
+ harness.processElementForTable("input", Row.of("A", 10));
+ harness.processElementForTable("input", Row.of("A", 20));
+ harness.processElementForTable("input", Row.of("B", 5));
+
+ assertThat(harness.getOutput())
+ .containsExactly(Row.of("A", 1L, 10), Row.of("A", 2L, 30),
Row.of("B", 1L, 5));
+
+ PTFWithMultipleStates.CounterState counterA =
+ harness.getStateForKey("counter", Row.of("A"));
+ assertThat(counterA.count).isEqualTo(2L);
+
+ ListView<Integer> historyA = harness.getStateForKey("history",
Row.of("A"));
+ assertThat(historyA.get()).containsExactly(10, 20);
+
+ harness.close();
+ }
+
+ @Test
+ void testInitialStateWithListView() throws Exception {
+ ListView<Integer> initialList = new ListView<>();
+ initialList.add(100);
+ initialList.add(200);
+
+ ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(PTFWithListViewState.class)
+ .withTableArgument("input", DataTypes.of("ROW<key
STRING, value INT>"))
+ .withPartitionBy("input", "key")
+ .withInitialStateArgument("listState", Row.of("A"),
initialList)
+ .build();
+
+ ListView<Integer> listState = harness.getStateForKey("listState",
Row.of("A"));
+ assertThat(listState.get()).containsExactly(100, 200);
+
+ harness.processElementForTable("input", Row.of("A", 3));
+ assertThat(harness.getOutput()).containsExactly(Row.of("A", new
Integer[] {100, 200, 3}));
+
+ harness.close();
+ }
+
+ @Test
+ void testInitialStateWithMapView() throws Exception {
+ MapView<String, Integer> initialMap = new MapView<>();
+ initialMap.put("existing", 42);
+
+ ProcessTableFunctionTestHarness<Row> harness =
+
ProcessTableFunctionTestHarness.ofClass(PTFWithMapViewState.class)
+ .withTableArgument(
+ "input", DataTypes.of("ROW<partition STRING,
key STRING>"))
+ .withPartitionBy("input", "partition")
+ .withInitialStateArgument("mapState", Row.of("P1"),
initialMap)
+ .build();
+
+ MapView<String, Integer> mapState = harness.getStateForKey("mapState",
Row.of("P1"));
+ assertThat(mapState.get("existing")).isEqualTo(42);
+
+ harness.processElementForTable("input", Row.of("P1", "existing"));
+ assertThat(harness.getOutput()).containsExactly(Row.of("P1",
"existing", 43));
+
+ harness.close();
+ }
+
Review Comment:
Add more tests for failing harness configuration or invalid PTFs?
* stateful PTF without `PARTITION BY`?
* adding an initial state with a key that's incompatible to the partitioning
(IMO, nice to have if it can be checked with reasonable effort). Possibly
checking all cases of user provided partitioning keys?
* is it possible that the PTF implementation uses the state in a wrong way
(incompatible types, incorrect type annotations, etc)? Would such errors be
captured outside of the harness? If not, how would such failures be revealed by
the harness? Or is everything so type-safe that users can't really make
mistakes here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]