gaborgsomogyi commented on PR #28142:
URL: https://github.com/apache/flink/pull/28142#issuecomment-4729949455
Two test coverage gaps worth addressing before merge. No production code
changes needed unless a bug is found while writing these tests.
**1. Streaming API — `SavepointReader.readKeyedState(...,
SavepointKeyFilter)` is untested**
The new overload is the primary programmatic interface for filter use
outside SQL, but the full path (filter construction, split pruning, key-level
iteration) has no test. A test should be added to
`SavepointReaderKeyedStateITCase` following the existing pattern:
```java
@Test
void readKeyedStateWithExactFilter() throws Exception {
AtomicInteger callCount = new AtomicInteger();
List<Long> result = savepointReader
.readKeyedState(
OperatorIdentifier.forUid(uid),
new CountingReader(callCount),
Types.LONG,
Types.LONG,
SavepointKeyFilter.exact(5L))
.executeAndCollect();
assertThat(result).containsExactly(5L);
assertThat(callCount.get()).isEqualTo(1); // proves only key=5 was
iterated, not all keys
}
```
The `callCount` assertion is what distinguishes a pushdown test from a
correctness test — it proves the reader was invoked fewer times than the total
number of keys in the savepoint. Coverage should include exact filter, range
filter, and empty filter.
---
**2. SQL level — existing filter tests verify correctness only, not that
pushdown fired**
Tests like `testFilterPushDownEqualityReturnsOnlyMatchingKey` assert on the
result set size. They would pass even if pushdown never fired and every filter
was evaluated post-scan by Flink's query engine. Pushdown can be verified via
plan inspection — when a filter is accepted by the source, the planner removes
it from the logical plan; when it is not, a `Calc` node appears above the scan:
```java
@Test
void filterPushDownAppearsInQueryPlan() throws Exception {
StreamTableEnvironment tEnv = createBatchTableEnv();
tEnv.executeSql(STATE_TABLE_DDL);
String plan = tEnv.explainSql("SELECT k FROM state_table WHERE k = 5");
// pushed-down filter: no Calc node above the source
assertThat(plan).doesNotContain("Calc(");
}
@Test
void nonPushableFilterRemainsAsCalcNode() throws Exception {
StreamTableEnvironment tEnv = createBatchTableEnv();
tEnv.executeSql(STATE_TABLE_DDL);
String plan = tEnv.explainSql("SELECT k FROM state_table WHERE k % 2 =
0");
// non-pushable filter: Calc node must be present
assertThat(plan).contains("Calc(");
}
```
Other Flink connectors use this pattern via `verifyRelPlan` in the planner
test module. The `explainSql` approach is a simpler equivalent available in the
state processing API module.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]