da-daken commented on code in PR #657:
URL: https://github.com/apache/flink-agents/pull/657#discussion_r3319489845
##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java:
##########
@@ -121,6 +131,63 @@ void
initializeKeyedStates(org.apache.flink.api.common.functions.RuntimeContext
PENDING_INPUT_EVENT_STATE_NAME,
TypeInformation.of(Event.class)));
}
+ /**
+ * When {@link AgentExecutionOptions#SHORT_TERM_MEMORY_STATE_TTL_MS} is
positive, attaches Flink
+ * {@link StateTtlConfig} to the short-term memory {@link
MapStateDescriptor}. Unset, null, or
+ * non-positive values disable TTL (Flink does not allow zero/negative
TTL).
+ */
+ private void maybeEnableShortTermMemoryTTL(
+ MapStateDescriptor<String, MemoryObjectImpl.MemoryItem> descriptor,
+ AgentPlan agentPlan) {
+ Long ttlMs =
+
agentPlan.getConfig().get(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_MS);
+ if (ttlMs == null || ttlMs <= 0) {
+ return;
+ }
+
+ ShortTermMemoryTtlUpdate updateType =
+ agentPlan
+ .getConfig()
+
.get(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_UPDATE_TYPE);
+
+ ShortTermMemoryTtlVisibility stateVisibility =
+ agentPlan
+ .getConfig()
+
.get(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_VISIBILITY);
+
+ StateTtlConfig ttlConfig =
+ StateTtlConfig.newBuilder(Duration.ofMillis(ttlMs))
+ .setUpdateType(toFlinkUpdateType(updateType))
+
.setStateVisibility(toFlinkStateVisibility(stateVisibility))
+ .cleanupFullSnapshot()
+ .build();
+ descriptor.enableTimeToLive(ttlConfig);
+ }
+
+ private StateTtlConfig.UpdateType
toFlinkUpdateType(ShortTermMemoryTtlUpdate updateType) {
+ switch (updateType) {
+ case ON_CREATE_AND_WRITE:
+ return StateTtlConfig.UpdateType.OnCreateAndWrite;
+ case ON_READ_AND_WRITE:
+ return StateTtlConfig.UpdateType.OnReadAndWrite;
+ default:
+ throw new IllegalArgumentException("Unsupported TTL update
type: " + updateType);
+ }
+ }
+
+ private StateTtlConfig.StateVisibility toFlinkStateVisibility(
+ ShortTermMemoryTtlVisibility stateVisibility) {
+ switch (stateVisibility) {
+ case NEVER_RETURN_EXPIRED:
+ return StateTtlConfig.StateVisibility.NeverReturnExpired;
+ case RETURN_EXPIRED_IF_NOT_CLEANED_UP:
+ return
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported TTL state visibility: " +
stateVisibility);
+ }
+ }
Review Comment:
hi @xintongsong ,I’d prefer not to put the mapping methods on the public
enum types.
> Are we comfortable exposing StateTtlConfig.UpdateType and
StateTtlConfig.StateVisibility directly in the public API? Two of the new
options take Flink internal enums as their value type, in a class under
org.apache.flink.agents.api.agents. That couples the public surface to Flink's
state API forever, and makes the Python mirror painful (we'd need to redefine
the enums on the Python side or fall back to strings).
As mentioned in the earlier review from @weiqingy , exposing Flink’s
StateTtlConfig.UpdateType /
StateTtlConfig.StateVisibility directly would couple the Agents public API
to Flink state APIs and
make the Python mirror harder. If ShortTermMemoryTtlUpdate or
ShortTermMemoryTtlVisibility exposes
toFlink...() methods, we effectively reintroduce that coupling through the
enum API.
I agree the mapping does not have to stay in OperatorStateManager.A good
compromise would be to move it into a runtime-internal helper, for example:
```java
final class ShortTermMemoryTtlConfigMapper {
static StateTtlConfig.UpdateType
toFlinkUpdateType(ShortTermMemoryTtlUpdate updateType) {
...
}
static StateTtlConfig.StateVisibility toFlinkStateVisibility(
ShortTermMemoryTtlVisibility visibility) {
...
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]