da-daken commented on code in PR #657:
URL: https://github.com/apache/flink-agents/pull/657#discussion_r3319489845


##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java:
##########
@@ -121,6 +131,63 @@ void 
initializeKeyedStates(org.apache.flink.api.common.functions.RuntimeContext
                                 PENDING_INPUT_EVENT_STATE_NAME, 
TypeInformation.of(Event.class)));
     }
 
+    /**
+     * When {@link AgentExecutionOptions#SHORT_TERM_MEMORY_STATE_TTL_MS} is 
positive, attaches Flink
+     * {@link StateTtlConfig} to the short-term memory {@link 
MapStateDescriptor}. Unset, null, or
+     * non-positive values disable TTL (Flink does not allow zero/negative 
TTL).
+     */
+    private void maybeEnableShortTermMemoryTTL(
+            MapStateDescriptor<String, MemoryObjectImpl.MemoryItem> descriptor,
+            AgentPlan agentPlan) {
+        Long ttlMs =
+                
agentPlan.getConfig().get(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_MS);
+        if (ttlMs == null || ttlMs <= 0) {
+            return;
+        }
+
+        ShortTermMemoryTtlUpdate updateType =
+                agentPlan
+                        .getConfig()
+                        
.get(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_UPDATE_TYPE);
+
+        ShortTermMemoryTtlVisibility stateVisibility =
+                agentPlan
+                        .getConfig()
+                        
.get(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_VISIBILITY);
+
+        StateTtlConfig ttlConfig =
+                StateTtlConfig.newBuilder(Duration.ofMillis(ttlMs))
+                        .setUpdateType(toFlinkUpdateType(updateType))
+                        
.setStateVisibility(toFlinkStateVisibility(stateVisibility))
+                        .cleanupFullSnapshot()
+                        .build();
+        descriptor.enableTimeToLive(ttlConfig);
+    }
+
+    private StateTtlConfig.UpdateType 
toFlinkUpdateType(ShortTermMemoryTtlUpdate updateType) {
+        switch (updateType) {
+            case ON_CREATE_AND_WRITE:
+                return StateTtlConfig.UpdateType.OnCreateAndWrite;
+            case ON_READ_AND_WRITE:
+                return StateTtlConfig.UpdateType.OnReadAndWrite;
+            default:
+                throw new IllegalArgumentException("Unsupported TTL update 
type: " + updateType);
+        }
+    }
+
+    private StateTtlConfig.StateVisibility toFlinkStateVisibility(
+            ShortTermMemoryTtlVisibility stateVisibility) {
+        switch (stateVisibility) {
+            case NEVER_RETURN_EXPIRED:
+                return StateTtlConfig.StateVisibility.NeverReturnExpired;
+            case RETURN_EXPIRED_IF_NOT_CLEANED_UP:
+                return 
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp;
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported TTL state visibility: " + 
stateVisibility);
+        }
+    }

Review Comment:
   hi @xintongsong ,I’d prefer not to put the mapping methods on the public 
enum types.                                 
                                                                                
                       
   
   > Are we comfortable exposing StateTtlConfig.UpdateType and 
StateTtlConfig.StateVisibility directly in the public API? Two of the new 
options take Flink internal enums as their value type, in a class under 
org.apache.flink.agents.api.agents. That couples the public surface to Flink's 
state API forever, and makes the Python mirror painful (we'd need to redefine 
the enums on the Python side or fall back to strings). 
   
   As mentioned in the earlier review from @weiqingy ,  exposing Flink’s 
StateTtlConfig.UpdateType /                    
   StateTtlConfig.StateVisibility directly would couple the Agents public API 
to Flink state APIs and
   make the Python mirror harder. If ShortTermMemoryTtlUpdate or 
ShortTermMemoryTtlVisibility exposes  
   toFlink...() methods, we effectively reintroduce that coupling through the 
enum API.
   
   I agree the mapping does not have to stay in OperatorStateManager.A good 
compromise would be to move it into a runtime-internal helper, for example:
   ```java
   final class ShortTermMemoryTtlConfigMapper {                                 
                       
         static StateTtlConfig.UpdateType 
toFlinkUpdateType(ShortTermMemoryTtlUpdate updateType) {       
             ...                                                                
                         
         }                                                                      
                         
                                                                                
                         
         static StateTtlConfig.StateVisibility toFlinkStateVisibility(          
                         
                 ShortTermMemoryTtlVisibility visibility) {                     
                         
             ...                                                                
                         
         }                                                                      
                         
     }
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to