xintongsong commented on code in PR #657:
URL: https://github.com/apache/flink-agents/pull/657#discussion_r3321638837
##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java:
##########
@@ -121,6 +131,63 @@ void
initializeKeyedStates(org.apache.flink.api.common.functions.RuntimeContext
PENDING_INPUT_EVENT_STATE_NAME,
TypeInformation.of(Event.class)));
}
+ /**
+ * When {@link AgentExecutionOptions#SHORT_TERM_MEMORY_STATE_TTL_MS} is
positive, attaches Flink
+ * {@link StateTtlConfig} to the short-term memory {@link
MapStateDescriptor}. Unset, null, or
+ * non-positive values disable TTL (Flink does not allow zero/negative
TTL).
+ */
+ private void maybeEnableShortTermMemoryTTL(
+ MapStateDescriptor<String, MemoryObjectImpl.MemoryItem> descriptor,
+ AgentPlan agentPlan) {
+ Long ttlMs =
+
agentPlan.getConfig().get(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_MS);
+ if (ttlMs == null || ttlMs <= 0) {
+ return;
+ }
+
+ ShortTermMemoryTtlUpdate updateType =
+ agentPlan
+ .getConfig()
+
.get(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_UPDATE_TYPE);
+
+ ShortTermMemoryTtlVisibility stateVisibility =
+ agentPlan
+ .getConfig()
+
.get(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_VISIBILITY);
+
+ StateTtlConfig ttlConfig =
+ StateTtlConfig.newBuilder(Duration.ofMillis(ttlMs))
+ .setUpdateType(toFlinkUpdateType(updateType))
+
.setStateVisibility(toFlinkStateVisibility(stateVisibility))
+ .cleanupFullSnapshot()
+ .build();
+ descriptor.enableTimeToLive(ttlConfig);
+ }
+
+ private StateTtlConfig.UpdateType
toFlinkUpdateType(ShortTermMemoryTtlUpdate updateType) {
+ switch (updateType) {
+ case ON_CREATE_AND_WRITE:
+ return StateTtlConfig.UpdateType.OnCreateAndWrite;
+ case ON_READ_AND_WRITE:
+ return StateTtlConfig.UpdateType.OnReadAndWrite;
+ default:
+ throw new IllegalArgumentException("Unsupported TTL update
type: " + updateType);
+ }
+ }
+
+ private StateTtlConfig.StateVisibility toFlinkStateVisibility(
+ ShortTermMemoryTtlVisibility stateVisibility) {
+ switch (stateVisibility) {
+ case NEVER_RETURN_EXPIRED:
+ return StateTtlConfig.StateVisibility.NeverReturnExpired;
+ case RETURN_EXPIRED_IF_NOT_CLEANED_UP:
+ return
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported TTL state visibility: " +
stateVisibility);
+ }
+ }
Review Comment:
Makes sense to me. Thanks for the clarification.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]