weiqingy opened a new issue, #711: URL: https://github.com/apache/flink-agents/issues/711
### Search before asking - [x] I searched in the [issues](https://github.com/apache/flink-agents/issues) and found nothing similar. ### Description ## Background PR #657 added TTL support for short-term memory state via three config options under `AgentExecutionOptions`: - `SHORT_TERM_MEMORY_STATE_TTL_MS` - `SHORT_TERM_MEMORY_STATE_TTL_UPDATE_TYPE` - `SHORT_TERM_MEMORY_STATE_TTL_VISIBILITY` Inside `OperatorStateManager.maybeEnableShortTermMemoryTTL`, the cleanup strategy is hardcoded to `cleanupFullSnapshot()`: ```java StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofMillis(ttlMs)) .setUpdateType(toFlinkUpdateType(updateType)) .setStateVisibility(toFlinkStateVisibility(stateVisibility)) .cleanupFullSnapshot() // hardcoded .build(); Problem Flink's StateTtlConfig.CleanupStrategies offers four cleanup paths, each with different operational trade-offs: ┌──────────────────────────────────────────────────┬──────────────────────────────────────┬──────────────────────────────────────────┐ │ Strategy │ When state is reclaimed │ Best for │ ├──────────────────────────────────────────────────┼──────────────────────────────────────┼──────────────────────────────────────────┤ │ Lazy (default, no explicit setter) │ On read access only │ Read-heavy workloads │ ├──────────────────────────────────────────────────┼──────────────────────────────────────┼──────────────────────────────────────────┤ │ cleanupFullSnapshot() (current hardcoded choice) │ During full checkpoint snapshots │ Pipelines with frequent full checkpoints │ ├──────────────────────────────────────────────────┼──────────────────────────────────────┼──────────────────────────────────────────┤ │ cleanupIncrementally(...) │ Incrementally on every state access │ Bounded latency overhead per access │ ├──────────────────────────────────────────────────┼──────────────────────────────────────┼──────────────────────────────────────────┤ │ cleanupInRocksdbCompactFilter(...) │ During RocksDB background compaction │ Long-running RocksDB-backed pipelines │ └──────────────────────────────────────────────────┴──────────────────────────────────────┴──────────────────────────────────────────┘ For long-running RocksDB pipelines using incremental checkpoints, full-snapshot cleanup means expired entries can persist on disk for extended periods, increasing on-disk state size unnecessarily. cleanupInRocksdbCompactFilter is the recommended strategy in that scenario. Today users cannot pick the right strategy for their deployment without forking the runtime. Proposal Add a fourth public config option, e.g. SHORT_TERM_MEMORY_STATE_TTL_CLEANUP_STRATEGY, mirroring Flink's strategies via a new public wrapper enum (ShortTermMemoryTtlCleanupStrategy) in org.apache.flink.agents.api.agents to keep Flink internals out of our public API surface (same pattern as ShortTermMemoryTtlUpdate / ShortTermMemoryTtlVisibility from #657). Default should remain FULL_SNAPSHOT for behavioral parity with the initial implementation. Mirror on the Python side (python/flink_agents/api/core_options.py) for Java↔Python parity, with a matching compatibility test entry. Related - PR #657 (parent feature) - Flink docs: State Time-To-Live (TTL) ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
