Hi Flink devs,
I'm working with the DataStream API V2 and noticed that state TTL cannot be
configured when using the ProcessFunction V2 declarative state
API(usesStates()+StateDeclaration).
Background: two parallel state hierarchies
The V2 architecture has two separate state type hierarchies,bridged
byDefaultStateManager:
-
User-facing declarationsinflink-core-api(org.apache.flink.api.common.state)
- StateDeclaration(interface) →ValueStateDeclaration,MapStateDeclaration, etc.
- No TTL methods. Returned by ProcessFunction'susesStates().
-
Internal descriptorsinflink-core(org.apache.flink.api.common.state.v2)
- StateDescriptor<T>(abstract class) →ValueStateDescriptor,MapStateDescriptor,
etc.
- HasenableTimeToLive(StateTtlConfig)andgetTtlConfig().
- This TTL support was introduced as part of FLINK-35262 (commit3e68422) by
implementingDefaultKeyedStateStoreV2.getValueState(ValueStateDescriptor).
These hierarchies haveno inheritance relationship—ValueStateDeclarationdoes not
extendValueStateDescriptor.
The gap:DefaultStateManager.getStateOptional()drops TTL
The bridge lives
inorg.apache.flink.datastream.impl.context.DefaultStateManager.For each state
type,getStateOptional()extractsnameand type information from
theStateDeclarationand constructs a freshStateDescriptor—but never
callsenableTimeToLive():
// DefaultStateManager.getStateOptional(ValueStateDeclaration)
// File: flink-datastream/.../datastream/impl/context/DefaultStateManager.java
@Override
public
<T> Optional<ValueState<T>> getStateOptional(ValueStateDeclaration<T>
stateDeclaration)
throws
Exception {
ValueStateDescriptor<T> valueStateDescriptor =
new
ValueStateDescriptor<>(
stateDeclaration.getName(),
TypeExtractor.createTypeInfo(
stateDeclaration.getTypeDescriptor().getTypeClass()));
// ☝️ enableTimeToLive() is never called — TTL defaults to DISABLED
return
Optional.ofNullable(operatorContext.getValueState(valueStateDescriptor));
}
The same pattern applies to all state types inDefaultStateManager:
- getStateOptional(ListStateDeclaration)— createsListStateDescriptor, no TTL
- getStateOptional(MapStateDeclaration)— createsMapStateDescriptor, no TTL
- getStateOptional(ReducingStateDeclaration)— createsReducingStateDescriptor,
no TTL
- getStateOptional(AggregatingStateDeclaration)—
createsAggregatingStateDescriptor, no TTL
Why this matters
A user writing a V2
ProcessFunction(e.g.,TwoOutputEventTimeStreamProcessFunction)must declare state
viausesStates()returningSet<StateDeclaration>.State is then accessed
throughctx.getStateManager().getState(decl).At no point in this path can the
user configureStateTtlConfig.The alternative path shown in the V2 state
docs—RuntimeContext.getState(StateDescriptor)withenableTimeToLive()—is only
available inRichFunction,not in V2 ProcessFunction.
Proposed fix
The simplest approach:addgetTtlConfig()toStateDeclarationand
haveDefaultStateManagerapply it:
// StateDeclaration.java — add:
@Nonnull
default
StateTtlConfig getTtlConfig() {
return
StateTtlConfig.DISABLED;
// preserves current behavior
}
// DefaultStateManager.getStateOptional() — apply:
ValueStateDescriptor<T> valueStateDescriptor =
new
ValueStateDescriptor<>(
stateDeclaration.getName(),
TypeExtractor.createTypeInfo(
stateDeclaration.getTypeDescriptor().getTypeClass()));
valueStateDescriptor.enableTimeToLive(stateDeclaration.getTtlConfig());
SinceStateTtlConfig.DISABLEDis the default,this is fully
backward-compatible.The
concreteValueStateDeclarationBuilder,MapStateDeclarationBuilder,etc.would also
need a newwithTtlConfig()method to make it usable.
Related tickets
- FLINK-34547: [Umbrella] FLIP-408: Introduce DataStream API V2
- FLINK-34977: FLIP-433: State Access on DataStream API V2
- FLINK-34549: FLIP-410: Config, Context and Processing Timer Service of
DataStream API V2
- FLINK-35262: Bridge between AsyncKeyedStateBackend and
AsyncExecutionController
Would this approach make sense,or is there a deliberate reason for the
omission?Happy to contribute a PR if folks agree on the direction.
Thanks,
Karol