gaborgsomogyi opened a new pull request, #28590:
URL: https://github.com/apache/flink/pull/28590
## What is the purpose of the change
The `SavepointKeyFilter` interface introduced in FLINK-39637 used raw
`Object` references
throughout (e.g., `boolean test(Object key)`, `Set<Object> getExactKeys()`),
which does not fit
the existing State Processor API design: `SavepointReader.readKeyedState` and
`KeyedStateReaderFunction` are both fully parameterized on `K`. This change
makes
`SavepointKeyFilter<K>` a proper generic interface so that the key type is
enforced by the
compiler end-to-end on the DataStream API path, consistent with the rest of
the State Processor
API.
At the same time, the Table API connector layer (`SavepointFilterTranslator`,
`SavepointDynamicTableSource`, `SavepointDataStreamScanProvider`)
intentionally keeps raw
`SavepointKeyFilter` -- see "Brief change log" for the rationale.
## Brief change log
- `SavepointKeyFilter<K>`: added type parameter; `test`, `getExactKeys`,
`intersect`, `exact`,
`range`, `filterKeys`, and `empty` are all typed on `K`
- `EmptyKeyFilter<K>`, `ExactKeyFilter<K>`, `RangeKeyFilter<K>`:
parameterized accordingly
- `KeyedStateInputFormat<K,N,OUT>`: field and constructor parameter changed
to
`SavepointKeyFilter<K>`; `pruneByExactKeys` and the key-iteration loop now
operate on `K`
instead of `Object`
- `SavepointReader.readKeyedState`: `@Nullable SavepointKeyFilter<K>
keyFilter` parameter
**Why the Table API layer stays raw:**
Four sites deliberately remain raw:
1. `EmptyKeyFilter.INSTANCE` -- a static final field cannot carry a type
variable after erasure;
the standard erasure-safe singleton pattern
(`@SuppressWarnings("rawtypes")` on the field,
unchecked cast in the `instance()` accessor) is used, same as
`Collections.EMPTY_LIST`.
2. `SavepointFilterTranslator` internal methods (`apply`, `extractFilter`,
`fromEquals`,
`fromOr`, `fromAnd`, `fromBetween`, `from*Comparison`, FILTERS map) --
this class operates
entirely in the SQL planner layer. Literal values are extracted via
`ValueLiteralExpression.getValueAs(literalClass)`, which returns
`Object`. The Java type
parameter `K` is not available at compile time here; only a `DataType` is
known at runtime.
Parameterizing `SavepointFilterTranslator<K>` would be meaningless
because no caller can
supply a `K`.
3. `SavepointFilterTranslator.Result` -- bridges the SQL layer to the
connector layer; has no
`K` to propagate.
4. `SavepointDynamicTableSource` and `SavepointDataStreamScanProvider` --
the Table API
connector framework (`DynamicTableSource`, `DataStreamScanProvider`)
carries no type
parameter, and `TypeInformation keyTypeInfo` in these classes was already
raw before this
change. Using raw `SavepointKeyFilter` is consistent with that
pre-existing constraint. Type
safety across this boundary is guaranteed at runtime: the key filter and
the key type info are
both derived from the same `DataType` inside `SavepointFilterTranslator`.
## Verifying this change
This change added tests and can be verified as follows:
- Existing unit tests in `SavepointFilterTranslatorTest` are updated to use
typed
`SavepointKeyFilter<Object>` / `SavepointKeyFilter<Long>` variables;
compilation itself
verifies generic correctness on the translator output path.
- Existing integration tests in `SavepointReaderKeyedStateITCase` cover
exact-key, multi-key,
inclusive/exclusive range, and empty filter scenarios end-to-end with
`SavepointKeyFilter<Integer>`.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: yes --
`SavepointKeyFilter` is `@PublicEvolving`; the change is source- and
binary-compatible for
callers that already use the raw type, and is a tightening (more
type-safe) API for callers
that use the generic form
- The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: no
- The S3 file system connector: no
## Documentation
- Does this pull request introduce a new feature? no (refines a feature
from FLINK-39637)
- If yes, how is the feature documented? not applicable
---
##### Was generative AI tooling used to co-author this PR?
- [X] Yes (Claude code)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]