da-daken opened a new issue, #691: URL: https://github.com/apache/flink-agents/issues/691
### Search before asking - [x] I searched in the [issues](https://github.com/apache/flink-agents/issues) and found nothing similar. ### Description ### Problem ActionState is persisted to the external ActionStateStore to support durable execution recovery. The current production store implementation is Kafka. When a checkpoint completes ```java actionStateStore.pruneState(entry.getKey(), entry.getValue()); ``` The intent is to prune action states for a Flink key up to the completed sequence number. However, the current KafkaActionStateStore.pruneState(...) implementation only removes matching entries from the in-memory actionStates cache. It does not delete the corresponding records from the Kafka topic. These historical keys are no longer useful for recovery once the completed sequence number has been covered by a successful checkpoint, but they can still accumulate indefinitely in Kafka. ### Solution After a checkpoint has completed,should delete from kafkaActionStateStore. Because the Kafka topic is configured with cleanup.policy=compact, writing a tombstone record (key = stateKey, value = null) allows Kafka log compaction to eventually remove all historical records for that ActionState key. ```java producer.send(new ProducerRecord<>(topic, stateKey, null)); ``` ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
