gaborgsomogyi commented on PR #28142:
URL: https://github.com/apache/flink/pull/28142#issuecomment-4729949455

   Two test coverage gaps worth addressing before merge. No production code 
changes needed unless a bug is found while writing these tests.
   
   **1. Streaming API — `SavepointReader.readKeyedState(..., 
SavepointKeyFilter)` is untested**
   
   The new overload is the primary programmatic interface for filter use 
outside SQL, but the full path (filter construction, split pruning, key-level 
iteration) has no test. A test should be added to 
`SavepointReaderKeyedStateITCase` following the existing pattern:
   
   ```java
   @Test
   void readKeyedStateWithExactFilter() throws Exception {
       AtomicInteger callCount = new AtomicInteger();
   
       List<Long> result = savepointReader
               .readKeyedState(
                       OperatorIdentifier.forUid(uid),
                       new CountingReader(callCount),
                       Types.LONG,
                       Types.LONG,
                       SavepointKeyFilter.exact(5L))
               .executeAndCollect();
   
       assertThat(result).containsExactly(5L);
       assertThat(callCount.get()).isEqualTo(1); // proves only key=5 was 
iterated, not all keys
   }
   ```
   
   The `callCount` assertion is what distinguishes a pushdown test from a 
correctness test — it proves the reader was invoked fewer times than the total 
number of keys in the savepoint. Coverage should include exact filter, range 
filter, and empty filter.
   
   ---
   
   **2. SQL level — existing filter tests verify correctness only, not that 
pushdown fired**
   
   Tests like `testFilterPushDownEqualityReturnsOnlyMatchingKey` assert on the 
result set size. They would pass even if pushdown never fired and every filter 
was evaluated post-scan by Flink's query engine. Pushdown can be verified via 
plan inspection — when a filter is accepted by the source, the planner removes 
it from the logical plan; when it is not, a `Calc` node appears above the scan:
   
   ```java
   @Test
   void filterPushDownAppearsInQueryPlan() throws Exception {
       StreamTableEnvironment tEnv = createBatchTableEnv();
       tEnv.executeSql(STATE_TABLE_DDL);
   
       String plan = tEnv.explainSql("SELECT k FROM state_table WHERE k = 5");
   
       // pushed-down filter: no Calc node above the source
       assertThat(plan).doesNotContain("Calc(");
   }
   
   @Test
   void nonPushableFilterRemainsAsCalcNode() throws Exception {
       StreamTableEnvironment tEnv = createBatchTableEnv();
       tEnv.executeSql(STATE_TABLE_DDL);
   
       String plan = tEnv.explainSql("SELECT k FROM state_table WHERE k % 2 = 
0");
   
       // non-pushable filter: Calc node must be present
       assertThat(plan).contains("Calc(");
   }
   ```
   
   Other Flink connectors use this pattern via `verifyRelPlan` in the planner 
test module. The `explainSql` approach is a simpler equivalent available in the 
state processing API module.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to