xintongsong commented on code in PR #657:
URL: https://github.com/apache/flink-agents/pull/657#discussion_r3321638837


##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java:
##########
@@ -121,6 +131,63 @@ void 
initializeKeyedStates(org.apache.flink.api.common.functions.RuntimeContext
                                 PENDING_INPUT_EVENT_STATE_NAME, 
TypeInformation.of(Event.class)));
     }
 
+    /**
+     * When {@link AgentExecutionOptions#SHORT_TERM_MEMORY_STATE_TTL_MS} is 
positive, attaches Flink
+     * {@link StateTtlConfig} to the short-term memory {@link 
MapStateDescriptor}. Unset, null, or
+     * non-positive values disable TTL (Flink does not allow zero/negative 
TTL).
+     */
+    private void maybeEnableShortTermMemoryTTL(
+            MapStateDescriptor<String, MemoryObjectImpl.MemoryItem> descriptor,
+            AgentPlan agentPlan) {
+        Long ttlMs =
+                
agentPlan.getConfig().get(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_MS);
+        if (ttlMs == null || ttlMs <= 0) {
+            return;
+        }
+
+        ShortTermMemoryTtlUpdate updateType =
+                agentPlan
+                        .getConfig()
+                        
.get(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_UPDATE_TYPE);
+
+        ShortTermMemoryTtlVisibility stateVisibility =
+                agentPlan
+                        .getConfig()
+                        
.get(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_VISIBILITY);
+
+        StateTtlConfig ttlConfig =
+                StateTtlConfig.newBuilder(Duration.ofMillis(ttlMs))
+                        .setUpdateType(toFlinkUpdateType(updateType))
+                        
.setStateVisibility(toFlinkStateVisibility(stateVisibility))
+                        .cleanupFullSnapshot()
+                        .build();
+        descriptor.enableTimeToLive(ttlConfig);
+    }
+
+    private StateTtlConfig.UpdateType 
toFlinkUpdateType(ShortTermMemoryTtlUpdate updateType) {
+        switch (updateType) {
+            case ON_CREATE_AND_WRITE:
+                return StateTtlConfig.UpdateType.OnCreateAndWrite;
+            case ON_READ_AND_WRITE:
+                return StateTtlConfig.UpdateType.OnReadAndWrite;
+            default:
+                throw new IllegalArgumentException("Unsupported TTL update 
type: " + updateType);
+        }
+    }
+
+    private StateTtlConfig.StateVisibility toFlinkStateVisibility(
+            ShortTermMemoryTtlVisibility stateVisibility) {
+        switch (stateVisibility) {
+            case NEVER_RETURN_EXPIRED:
+                return StateTtlConfig.StateVisibility.NeverReturnExpired;
+            case RETURN_EXPIRED_IF_NOT_CLEANED_UP:
+                return 
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp;
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported TTL state visibility: " + 
stateVisibility);
+        }
+    }

Review Comment:
   Makes sense to me. Thanks for the clarification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to