autophagy commented on code in PR #28212:
URL: https://github.com/apache/flink/pull/28212#discussion_r3288357572


##########
flink-table/flink-table-test-utils/src/test/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarnessTest.java:
##########
@@ -1062,4 +1110,309 @@ void testPartitionByDuplicateConfigThrows() {
 
         assertThat(exception.getMessage()).contains("Partition config already 
exists");
     }
+
+    // 
-------------------------------------------------------------------------
+    // State Tests
+    // 
-------------------------------------------------------------------------
+
+    @Test
+    void testPojoState() throws Exception {
+        ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+                        .withTableArgument("input", DataTypes.of("ROW<name 
STRING, value INT>"))
+                        .withPartitionBy("input", "name")
+                        .build();
+
+        harness.processElementForTable("input", Row.of("Alice", 10));
+        assertThat(harness.getOutput()).containsExactly(Row.of("Alice", 1L));
+
+        PTFWithPojoState.CounterState state = harness.getStateForKey("state", 
Row.of("Alice"));
+        assertThat(state.counter).isEqualTo(1L);
+
+        harness.processElementForTable("input", Row.of("Alice", 15));
+        assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("Alice", 2L));
+
+        state = harness.getStateForKey("state", Row.of("Alice"));
+        assertThat(state.counter).isEqualTo(2L);
+
+        harness.close();
+    }
+
+    @Test
+    void testPojoStatePartitionIsolation() throws Exception {
+        ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+                        .withTableArgument("input", DataTypes.of("ROW<name 
STRING, value INT>"))
+                        .withPartitionBy("input", "name")
+                        .build();
+
+        harness.processElementForTable("input", Row.of("Alice", 10));
+        harness.processElementForTable("input", Row.of("Bob", 20));
+        harness.processElementForTable("input", Row.of("Alice", 15));
+
+        PTFWithPojoState.CounterState aliceState = 
harness.getStateForKey("state", Row.of("Alice"));
+        PTFWithPojoState.CounterState bobState = 
harness.getStateForKey("state", Row.of("Bob"));
+
+        assertThat(aliceState.counter).isEqualTo(2L);
+        assertThat(bobState.counter).isEqualTo(1L);
+
+        harness.close();
+    }
+
+    @Test
+    void testPojoStateWithInitialState() throws Exception {
+        PTFWithPojoState.CounterState initialState = new 
PTFWithPojoState.CounterState();
+        initialState.counter = 100L;
+
+        ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+                        .withTableArgument("input", DataTypes.of("ROW<id 
INT>"))
+                        .withPartitionBy("input", "id")
+                        .withInitialStateArgument("state", Row.of(1), 
initialState)
+                        .build();
+
+        PTFWithPojoState.CounterState state = harness.getStateForKey("state", 
Row.of(1));
+        assertThat(state.counter).isEqualTo(100L);
+
+        harness.processElement(Row.of(1));
+        assertThat(harness.getOutput()).containsExactly(Row.of(1, 101L));
+
+        harness.processElement(Row.of(2));
+        assertThat(harness.getOutput().get(1)).isEqualTo(Row.of(2, 1L));
+
+        harness.close();
+    }
+
+    @Test
+    void testGetStateKeys() throws Exception {
+        ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+                        .withTableArgument("input", DataTypes.of("ROW<name 
STRING, value INT>"))
+                        .withPartitionBy("input", "name")
+                        .build();
+
+        harness.processElementForTable("input", Row.of("Alice", 10));
+        harness.processElementForTable("input", Row.of("Bob", 20));
+        harness.processElementForTable("input", Row.of("Charlie", 30));
+
+        java.util.Set<Row> keys = harness.getStateKeys("state");
+        assertThat(keys)
+                .containsExactlyInAnyOrder(Row.of("Alice"), Row.of("Bob"), 
Row.of("Charlie"));
+
+        harness.close();
+    }
+
+    @Test
+    void testGetAllState() throws Exception {
+        ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+                        .withTableArgument("input", DataTypes.of("ROW<name 
STRING, value INT>"))
+                        .withPartitionBy("input", "name")
+                        .build();
+
+        harness.processElementForTable("input", Row.of("Alice", 10));
+        harness.processElementForTable("input", Row.of("Alice", 15));
+        harness.processElementForTable("input", Row.of("Bob", 20));
+
+        java.util.Map<Row, PTFWithPojoState.CounterState> allState = 
harness.getAllState("state");
+
+        assertThat(allState).hasSize(2);
+        assertThat(allState.get(Row.of("Alice")).counter).isEqualTo(2L);
+        assertThat(allState.get(Row.of("Bob")).counter).isEqualTo(1L);
+
+        harness.close();
+    }
+
+    @Test
+    void testListViewState() throws Exception {
+        ProcessTableFunctionTestHarness<Row> harness =
+                
ProcessTableFunctionTestHarness.ofClass(PTFWithListViewState.class)
+                        .withTableArgument("input", DataTypes.of("ROW<key 
STRING, value INT>"))
+                        .withPartitionBy("input", "key")
+                        .build();
+
+        harness.processElementForTable("input", Row.of("A", 1));
+        assertThat(harness.getOutput()).containsExactly(Row.of("A", new 
Integer[] {1}));
+
+        harness.processElementForTable("input", Row.of("A", 2));
+        assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("A", new 
Integer[] {1, 2}));
+
+        org.apache.flink.table.api.dataview.ListView<Integer> listState =
+                harness.getStateForKey("listState", Row.of("A"));
+        assertThat(listState.get()).containsExactly(1, 2);
+
+        harness.close();
+    }
+
+    @Test
+    void testMapViewState() throws Exception {
+        ProcessTableFunctionTestHarness<Row> harness =
+                
ProcessTableFunctionTestHarness.ofClass(PTFWithMapViewState.class)
+                        .withTableArgument(
+                                "input", DataTypes.of("ROW<partition STRING, 
key STRING>"))
+                        .withPartitionBy("input", "partition")
+                        .build();
+
+        harness.processElementForTable("input", Row.of("P1", "foo"));
+        assertThat(harness.getOutput()).containsExactly(Row.of("P1", "foo", 
1));
+
+        harness.processElementForTable("input", Row.of("P1", "foo"));
+        assertThat(harness.getOutput().get(1)).isEqualTo(Row.of("P1", "foo", 
2));
+
+        harness.processElementForTable("input", Row.of("P1", "bar"));
+
+        org.apache.flink.table.api.dataview.MapView<String, Integer> mapState =
+                harness.getStateForKey("mapState", Row.of("P1"));
+        assertThat(mapState.get("foo")).isEqualTo(2);
+        assertThat(mapState.get("bar")).isEqualTo(1);
+
+        harness.close();
+    }
+
+    @Test
+    void testEmptyState() throws Exception {
+        ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+                        .withTableArgument("input", DataTypes.of("ROW<name 
STRING, value INT>"))
+                        .withPartitionBy("input", "name")
+                        .build();
+
+        PTFWithPojoState.CounterState state = harness.getStateForKey("state", 
Row.of("Alice"));
+
+        assertThat(state).isNull();
+
+        harness.close();
+    }
+
+    @Test
+    void testClearStateForPartition() throws Exception {
+        ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+                        .withTableArgument("input", DataTypes.of("ROW<name 
STRING, value INT>"))
+                        .withPartitionBy("input", "name")
+                        .build();
+
+        harness.processElementForTable("input", Row.of("Alice", 10));
+        harness.processElementForTable("input", Row.of("Alice", 15));
+
+        PTFWithPojoState.CounterState state = harness.getStateForKey("state", 
Row.of("Alice"));
+        assertThat(state.counter).isEqualTo(2L);
+
+        harness.clearStateForPartition(Row.of("Alice"));
+
+        state = harness.getStateForKey("state", Row.of("Alice"));
+        assertThat(state).isNull();
+
+        harness.close();
+    }
+
+    @Test
+    void testClearStateEntry() throws Exception {
+        ProcessTableFunctionTestHarness<Row> harness =
+                ProcessTableFunctionTestHarness.ofClass(PTFWithPojoState.class)
+                        .withTableArgument("input", DataTypes.of("ROW<name 
STRING, value INT>"))
+                        .withPartitionBy("input", "name")
+                        .build();
+
+        harness.processElementForTable("input", Row.of("Alice", 10));
+        harness.processElementForTable("input", Row.of("Alice", 15));
+
+        PTFWithPojoState.CounterState state = harness.getStateForKey("state", 
Row.of("Alice"));
+        assertThat(state.counter).isEqualTo(2L);
+
+        harness.clearStateEntry(Row.of("Alice"), "state");
+
+        state = harness.getStateForKey("state", Row.of("Alice"));
+        assertThat(state.counter).isEqualTo(0L);
+
+        harness.close();
+    }
+
+    @Test
+    void testMultipleStateParameters() throws Exception {
+        ProcessTableFunctionTestHarness<Row> harness =
+                
ProcessTableFunctionTestHarness.ofClass(PTFWithMultipleStates.class)
+                        .withTableArgument("input", DataTypes.of("ROW<key 
STRING, value INT>"))
+                        .withPartitionBy("input", "key")
+                        .build();
+
+        harness.processElementForTable("input", Row.of("A", 10));
+        harness.processElementForTable("input", Row.of("A", 20));
+        harness.processElementForTable("input", Row.of("B", 5));
+
+        assertThat(harness.getOutput())
+                .containsExactly(Row.of("A", 1L, 10), Row.of("A", 2L, 30), 
Row.of("B", 1L, 5));
+
+        PTFWithMultipleStates.CounterState counterA =
+                harness.getStateForKey("counter", Row.of("A"));
+        assertThat(counterA.count).isEqualTo(2L);
+
+        ListView<Integer> historyA = harness.getStateForKey("history", 
Row.of("A"));
+        assertThat(historyA.get()).containsExactly(10, 20);
+
+        harness.close();
+    }
+
+    @Test
+    void testInitialStateWithListView() throws Exception {
+        ListView<Integer> initialList = new ListView<>();
+        initialList.add(100);
+        initialList.add(200);
+
+        ProcessTableFunctionTestHarness<Row> harness =
+                
ProcessTableFunctionTestHarness.ofClass(PTFWithListViewState.class)
+                        .withTableArgument("input", DataTypes.of("ROW<key 
STRING, value INT>"))
+                        .withPartitionBy("input", "key")
+                        .withInitialStateArgument("listState", Row.of("A"), 
initialList)
+                        .build();
+
+        ListView<Integer> listState = harness.getStateForKey("listState", 
Row.of("A"));
+        assertThat(listState.get()).containsExactly(100, 200);
+
+        harness.processElementForTable("input", Row.of("A", 3));
+        assertThat(harness.getOutput()).containsExactly(Row.of("A", new 
Integer[] {100, 200, 3}));
+
+        harness.close();
+    }
+
+    @Test
+    void testInitialStateWithMapView() throws Exception {
+        MapView<String, Integer> initialMap = new MapView<>();
+        initialMap.put("existing", 42);
+
+        ProcessTableFunctionTestHarness<Row> harness =
+                
ProcessTableFunctionTestHarness.ofClass(PTFWithMapViewState.class)
+                        .withTableArgument(
+                                "input", DataTypes.of("ROW<partition STRING, 
key STRING>"))
+                        .withPartitionBy("input", "partition")
+                        .withInitialStateArgument("mapState", Row.of("P1"), 
initialMap)
+                        .build();
+
+        MapView<String, Integer> mapState = harness.getStateForKey("mapState", 
Row.of("P1"));
+        assertThat(mapState.get("existing")).isEqualTo(42);
+
+        harness.processElementForTable("input", Row.of("P1", "existing"));
+        assertThat(harness.getOutput()).containsExactly(Row.of("P1", 
"existing", 43));
+
+        harness.close();
+    }
+

Review Comment:
   On the first test case, this should be already covered by 
`testSetSemanticMissingPartitionConfigThrows`, which asserts an exception if 
set semantic table arguments exist. But we dont have a test for a stateful ptf 
without any set semantic tables, so I can add one there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to