xintongsong commented on code in PR #657:
URL: https://github.com/apache/flink-agents/pull/657#discussion_r3318942742
##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java:
##########
@@ -121,6 +131,63 @@ void
initializeKeyedStates(org.apache.flink.api.common.functions.RuntimeContext
PENDING_INPUT_EVENT_STATE_NAME,
TypeInformation.of(Event.class)));
}
+ /**
+ * When {@link AgentExecutionOptions#SHORT_TERM_MEMORY_STATE_TTL_MS} is
positive, attaches Flink
+ * {@link StateTtlConfig} to the short-term memory {@link
MapStateDescriptor}. Unset, null, or
+ * non-positive values disable TTL (Flink does not allow zero/negative
TTL).
+ */
+ private void maybeEnableShortTermMemoryTTL(
+ MapStateDescriptor<String, MemoryObjectImpl.MemoryItem> descriptor,
+ AgentPlan agentPlan) {
+ Long ttlMs =
+
agentPlan.getConfig().get(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_MS);
+ if (ttlMs == null || ttlMs <= 0) {
+ return;
+ }
+
+ ShortTermMemoryTtlUpdate updateType =
+ agentPlan
+ .getConfig()
+
.get(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_UPDATE_TYPE);
+
+ ShortTermMemoryTtlVisibility stateVisibility =
+ agentPlan
+ .getConfig()
+
.get(AgentExecutionOptions.SHORT_TERM_MEMORY_STATE_TTL_VISIBILITY);
+
+ StateTtlConfig ttlConfig =
+ StateTtlConfig.newBuilder(Duration.ofMillis(ttlMs))
+ .setUpdateType(toFlinkUpdateType(updateType))
+
.setStateVisibility(toFlinkStateVisibility(stateVisibility))
+ .cleanupFullSnapshot()
+ .build();
+ descriptor.enableTimeToLive(ttlConfig);
+ }
+
+ private StateTtlConfig.UpdateType
toFlinkUpdateType(ShortTermMemoryTtlUpdate updateType) {
+ switch (updateType) {
+ case ON_CREATE_AND_WRITE:
+ return StateTtlConfig.UpdateType.OnCreateAndWrite;
+ case ON_READ_AND_WRITE:
+ return StateTtlConfig.UpdateType.OnReadAndWrite;
+ default:
+ throw new IllegalArgumentException("Unsupported TTL update
type: " + updateType);
+ }
+ }
+
+ private StateTtlConfig.StateVisibility toFlinkStateVisibility(
+ ShortTermMemoryTtlVisibility stateVisibility) {
+ switch (stateVisibility) {
+ case NEVER_RETURN_EXPIRED:
+ return StateTtlConfig.StateVisibility.NeverReturnExpired;
+ case RETURN_EXPIRED_IF_NOT_CLEANED_UP:
+ return
StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp;
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported TTL state visibility: " +
stateVisibility);
+ }
+ }
Review Comment:
I suggest placing these two methods in `UpdateType` and `StateVisibility`,
respectively. They are only related to the two enum types themselves, not to
where they are used.
##########
integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponsesModelConnection.java:
##########
@@ -72,6 +74,7 @@
* public static ResourceDesc openAIResponses() {
* return
ResourceDescriptor.Builder.newBuilder(OpenAIResponsesModelConnection.class.getName())
* .addInitialArgument("api_key", System.getenv("OPENAI_API_KEY"))
+ * .addInitialArgument("api_base_url",
System.getenv("OPENAI_API_URL"))
Review Comment:
This change is unrelated to TTL. It should be placed in a separate commit.
Usually, in such cases, we add `[hotfix]` commits for fixing existing issues
before commits of the actual PR changes.
##########
runtime/src/main/java/org/apache/flink/agents/runtime/operator/OperatorStateManager.java:
##########
@@ -121,6 +128,39 @@ void
initializeKeyedStates(org.apache.flink.api.common.functions.RuntimeContext
PENDING_INPUT_EVENT_STATE_NAME,
TypeInformation.of(Event.class)));
}
+ /**
+ * When {@link AgentExecutionOptions#SHORT_TERM_MEMORY_STATE_TTL_MS} is
positive, attaches Flink
+ * {@link StateTtlConfig} to the short-term memory {@link
MapStateDescriptor}. Unset, null, or
+ * non-positive values disable TTL (Flink does not allow zero/negative
TTL).
+ */
+ private void maybeEnableShortTermMemoryTTL(
+ MapStateDescriptor<String, MemoryObjectImpl.MemoryItem> descriptor,
+ AgentPlan agentPlan) {
Review Comment:
It's unnecessary to take the whole `AgentPlan` as an argument.
`AgentConfiguration` should be enough.
##########
runtime/src/test/java/org/apache/flink/agents/runtime/memory/ShortTermMemoryTTLIntegrationTest.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.runtime.memory;
+
+import org.apache.flink.agents.api.AgentsExecutionEnvironment;
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.agents.Agent;
+import org.apache.flink.agents.api.agents.AgentExecutionOptions;
+import org.apache.flink.agents.api.agents.ShortTermMemoryTtlUpdate;
+import org.apache.flink.agents.api.agents.ShortTermMemoryTtlVisibility;
+import org.apache.flink.agents.api.annotation.Action;
+import org.apache.flink.agents.api.context.MemoryObject;
+import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.plan.AgentConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Integration test for Short-Term Memory TTL functionality. */
+class ShortTermMemoryTTLIntegrationTest {
+
+ private static final String MEMORY_KEY = "test_key";
+
+ private static final class TestInput {
+ public String eventKey;
+ public long sleepMs;
+
+ private TestInput() {}
+
+ private TestInput(String eventKey, long sleepMs) {
+ this.eventKey = eventKey;
+ this.sleepMs = sleepMs;
+ }
+ }
+
+ public static class TTLTestAgent extends Agent {
+
+ @Action(listenEventTypes = {InputEvent.EVENT_TYPE})
+ public static void input(org.apache.flink.agents.api.Event event,
RunnerContext ctx)
+ throws Exception {
+ InputEvent inputEvent = (InputEvent) event;
+ TestInput input = (TestInput) inputEvent.getInput();
+
+ MemoryObject shortTermMemory = ctx.getShortTermMemory();
+ MemoryObject memoryObject = shortTermMemory.get(input.eventKey);
+
+ Object existingValue = null;
+ int currentCount = 0;
+ if (memoryObject != null && !memoryObject.isNestedObject()) {
+ existingValue = memoryObject.getValue();
+ if (existingValue instanceof Integer) {
+ currentCount = (Integer) existingValue;
+ } else if (existingValue instanceof Number) {
+ currentCount = ((Number) existingValue).intValue();
+ }
+ }
+
+ shortTermMemory.set(input.eventKey, currentCount + 1);
+ Thread.sleep(input.sleepMs);
+ ctx.sendEvent(
+ new OutputEvent(
+ input.eventKey + "|" + (existingValue == null ?
"NEW" : "EXISTING")));
+ }
+ }
+
+ @Test
+ void testTTLConfigurationNotApplied() throws Exception {
+ List<String> results = runScenario(1000L, 0L, true, true);
+
+ assertEquals(List.of("event1|NEW", "event2|NEW", "event1|EXISTING"),
results);
+ }
+
+ @Test
+ void testTTLConfigurationDisabledWithZeroTtl() throws Exception {
+ List<String> results = runScenario(0L, 2000L, true, true);
Review Comment:
Sleeping for 2s is unnecessarily long. If we expect the state still exist
after the sleep, sleep shorter (e.g., 50ms). If we expect the state does not
exist, we can set a small ttl (e.g., 50ms) and wait a bit longer (e.g., 200ms).
Either way, 2s is too long.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]