gaborgsomogyi commented on code in PR #28142:
URL: https://github.com/apache/flink/pull/28142#discussion_r3436267392


##########
flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java:
##########
@@ -116,6 +118,54 @@ void testMaxParallelismRespected(boolean asyncState) 
throws Exception {
                 splits.length);
     }
 
+    @ParameterizedTest(name = "Enable async state = {0}")
+    @ValueSource(booleans = {false, true})
+    void testExactKeyFilterPrunesInputSplits(boolean asyncState) throws 
Exception {
+        OperatorID operatorID = OperatorIDGenerator.fromUid("uid");
+
+        OperatorSubtaskState state =
+                createOperatorSubtaskState(createFlatMap(asyncState), 
asyncState);
+        OperatorState operatorState = new OperatorState(null, null, 
operatorID, 1, 128);
+        operatorState.putState(0, state);
+
+        KeyedStateInputFormat<?, ?, ?> format =
+                new KeyedStateInputFormat<>(
+                        operatorState,
+                        new HashMapStateBackend(),
+                        new Configuration(),
+                        new KeyedStateReaderOperator<>(new ReaderFunction(), 
Types.INT),
+                        new ExecutionConfig(),
+                        SavepointKeyFilter.exact(5));
+        KeyGroupRangeInputSplit[] splits = format.createInputSplits(10);
+
+        assertThat(splits)
+                .as("Single-key exact filter maps to exactly one key group 
range")
+                .hasSize(1);
+    }
+
+    @ParameterizedTest(name = "Enable async state = {0}")
+    @ValueSource(booleans = {false, true})
+    void testEmptyFilterProducesNoInputSplits(boolean asyncState) throws 
Exception {
+        OperatorID operatorID = OperatorIDGenerator.fromUid("uid");
+
+        OperatorSubtaskState state =
+                createOperatorSubtaskState(createFlatMap(asyncState), 
asyncState);
+        OperatorState operatorState = new OperatorState(null, null, 
operatorID, 1, 128);
+        operatorState.putState(0, state);
+
+        KeyedStateInputFormat<?, ?, ?> format =
+                new KeyedStateInputFormat<>(
+                        operatorState,
+                        new HashMapStateBackend(),
+                        new Configuration(),
+                        new KeyedStateReaderOperator<>(new ReaderFunction(), 
Types.INT),
+                        new ExecutionConfig(),
+                        SavepointKeyFilter.empty());
+        KeyGroupRangeInputSplit[] splits = format.createInputSplits(10);
+
+        assertThat(splits).isEmpty();
+    }
+
     @ParameterizedTest(name = "Enable async state = {0}")
     @ValueSource(booleans = {false, true})

Review Comment:
   Exact and empty filter split pruning are covered. A range filter case is 
missing.
   
   Range filters cannot prune splits because key-group assignment uses a hash 
that does not preserve ordering — so a range filter must produce the same 
number of splits as no filter at all. This behaviour is currently undocumented 
by a test. Without it, a future change that accidentally prunes splits for 
range filters would go undetected:
   
   ```java
   @ParameterizedTest(name = "Enable async state = {0}")
   @ValueSource(booleans = {false, true})
   void testRangeFilterDoesNotPruneSplits(boolean asyncState) throws Exception {
       // setup same as the other tests...
       KeyedStateInputFormat<?, ?, ?> formatRange = new KeyedStateInputFormat<>(
               operatorState, ...,
               SavepointKeyFilter.range(3, true, 7, true));
   
       KeyedStateInputFormat<?, ?, ?> formatNoFilter = new 
KeyedStateInputFormat<>(
               operatorState, ..., null);
   
       assertThat(formatRange.createInputSplits(10))
               .hasSize(formatNoFilter.createInputSplits(10).length);
   }
   ```



##########
flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java:
##########
@@ -85,6 +89,151 @@ public void testUserKeyedStateReader() throws Exception {
                 "Unexpected results from keyed state", expected, new 
HashSet<>(results));
     }
 
+    @Test
+    public void testReadKeyedStateWithExactFilter() throws Exception {
+        Tuple2<Configuration, B> backendTuple = getStateBackendTuple();
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(backendTuple.f0);
+        env.setParallelism(4);
+
+        applyStatefulPipeline(env);
+
+        String savepointPath = takeSavepoint(env);
+
+        SavepointReader savepoint = SavepointReader.read(env, savepointPath, 
backendTuple.f1);
+        CountingReadResult result =
+                readKeyedStateWithCountingReader(savepoint, 
SavepointKeyFilter.exact(5));
+        // Only key=5 reaches the reader and reads state.
+        assertThat(result.values).containsExactly(5);
+        assertThat(result.counter).isEqualTo(1);
+    }
+
+    @Test
+    public void testReadKeyedStateWithInclusiveRangeFilter() throws Exception {
+        Tuple2<Configuration, B> backendTuple = getStateBackendTuple();
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(backendTuple.f0);
+        env.setParallelism(4);
+
+        applyStatefulPipeline(env);
+
+        String savepointPath = takeSavepoint(env);
+
+        SavepointReader savepoint = SavepointReader.read(env, savepointPath, 
backendTuple.f1);
+        CountingReadResult result =
+                readKeyedStateWithCountingReader(
+                        savepoint, SavepointKeyFilter.range(3, true, 6, true));
+        // [3, 6]: only the in-range keys reach the reader and read state.
+        assertThat(result.values).containsExactlyInAnyOrder(3, 4, 5, 6);
+        assertThat(result.counter).isEqualTo(4);
+    }
+
+    @Test
+    public void 
testReadKeyedStateWithInclusiveLowerExclusiveUpperRangeFilter() throws 
Exception {
+        Tuple2<Configuration, B> backendTuple = getStateBackendTuple();
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(backendTuple.f0);
+        env.setParallelism(4);
+
+        applyStatefulPipeline(env);
+
+        String savepointPath = takeSavepoint(env);
+
+        SavepointReader savepoint = SavepointReader.read(env, savepointPath, 
backendTuple.f1);
+        CountingReadResult result =
+                readKeyedStateWithCountingReader(
+                        savepoint, SavepointKeyFilter.range(3, true, 6, 
false));
+        // [3, 6): the exclusive upper bound drops key 6.
+        assertThat(result.values).containsExactlyInAnyOrder(3, 4, 5);
+        assertThat(result.counter).isEqualTo(3);
+    }
+
+    @Test
+    public void 
testReadKeyedStateWithExclusiveLowerInclusiveUpperRangeFilter() throws 
Exception {
+        Tuple2<Configuration, B> backendTuple = getStateBackendTuple();
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(backendTuple.f0);
+        env.setParallelism(4);
+
+        applyStatefulPipeline(env);
+
+        String savepointPath = takeSavepoint(env);
+
+        SavepointReader savepoint = SavepointReader.read(env, savepointPath, 
backendTuple.f1);
+        CountingReadResult result =
+                readKeyedStateWithCountingReader(
+                        savepoint, SavepointKeyFilter.range(3, false, 6, 
true));
+        // (3, 6]: the exclusive lower bound drops key 3.
+        assertThat(result.values).containsExactlyInAnyOrder(4, 5, 6);
+        assertThat(result.counter).isEqualTo(3);
+    }
+
+    @Test
+    public void testReadKeyedStateWithExclusiveRangeFilter() throws Exception {
+        Tuple2<Configuration, B> backendTuple = getStateBackendTuple();
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(backendTuple.f0);
+        env.setParallelism(4);
+
+        applyStatefulPipeline(env);
+
+        String savepointPath = takeSavepoint(env);
+
+        SavepointReader savepoint = SavepointReader.read(env, savepointPath, 
backendTuple.f1);
+        CountingReadResult result =
+                readKeyedStateWithCountingReader(
+                        savepoint, SavepointKeyFilter.range(3, false, 6, 
false));
+        // (3, 6): both bounds exclusive, dropping keys 3 and 6.
+        assertThat(result.values).containsExactlyInAnyOrder(4, 5);
+        assertThat(result.counter).isEqualTo(2);
+    }
+
+    @Test
+    public void testReadKeyedStateWithEmptyFilter() throws Exception {
+        Tuple2<Configuration, B> backendTuple = getStateBackendTuple();
+        StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(backendTuple.f0);
+        env.setParallelism(4);
+
+        applyStatefulPipeline(env);
+
+        String savepointPath = takeSavepoint(env);
+
+        SavepointReader savepoint = SavepointReader.read(env, savepointPath, 
backendTuple.f1);
+        CountingReadResult result =
+                readKeyedStateWithCountingReader(savepoint, 
SavepointKeyFilter.empty());
+        // No key reaches the reader, so no state is read.
+        assertThat(result.values).isEmpty();
+        assertThat(result.counter).isZero();
+    }
+
+    private void applyStatefulPipeline(StreamExecutionEnvironment env) {

Review Comment:
   The streaming API tests cover single-key exact, range boundary variants, and 
empty — good. Two gaps worth adding:
   
   **Multi-key exact (IN-list)** exercises a different code path in 
`createInputSplits`: multiple keys map to multiple key groups, and the split 
pruning logic must handle each one. A single-key test does not cover that 
branch:
   
   ```java
   @Test
   public void testReadKeyedStateWithMultiKeyExactFilter() throws Exception {
       // ...
       CountingReadResult result = readKeyedStateWithCountingReader(
               savepoint,
               SavepointKeyFilter.exact(Set.of(3, 5, 7)));
   
       assertThat(result.values).containsExactlyInAnyOrder(3, 5, 7);
       assertThat(result.counter).isEqualTo(3);
   }
   ```
   
   **Literal type widening at the API level**: the IT case uses `Integer` keys 
with `Integer` bounds throughout. The widening path (`Integer` literal against 
a `Long` key column) is only covered at the translator unit-test level. An IT 
case with `Long` keys and an `Integer` bound would verify the full chain.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to