This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 4452abeb [api][runtime] Clean up resource when close. (#428)
4452abeb is described below

commit 4452abebe1f7e29e816fd25e633c62f2d32b8259
Author: Wenjin Xie <[email protected]>
AuthorDate: Wed Jan 14 17:37:17 2026 +0800

    [api][runtime] Clean up resource when close. (#428)
---
 .../api/chat/model/python/PythonChatModelConnection.java |  5 +++++
 .../model/python/PythonEmbeddingModelConnection.java     |  5 +++++
 .../org/apache/flink/agents/api/resource/Resource.java   |  3 +++
 .../anthropic/AnthropicChatModelConnection.java          |  5 +++++
 .../chatmodels/openai/OpenAIChatModelConnection.java     |  5 +++++
 .../apache/flink/agents/integrations/mcp/MCPPrompt.java  |  5 +++++
 .../apache/flink/agents/integrations/mcp/MCPTool.java    |  5 +++++
 .../elasticsearch/ElasticsearchVectorStore.java          |  5 +++++
 .../java/org/apache/flink/agents/plan/AgentPlan.java     |  9 +++++++++
 python/flink_agents/api/resource.py                      |  3 +++
 python/flink_agents/api/runner_context.py                |  4 ++++
 .../chat_models/anthropic/anthropic_chat_model.py        |  9 +++++++++
 .../integrations/chat_models/openai/openai_chat_model.py |  9 +++++++++
 .../embedding_models/openai_embedding_model.py           | 12 +++++++++++-
 python/flink_agents/integrations/mcp/mcp.py              | 16 ++++++++++++++++
 python/flink_agents/plan/agent_plan.py                   |  8 ++++++++
 python/flink_agents/runtime/flink_runner_context.py      | 16 +++++++++++++++-
 python/flink_agents/runtime/java/java_chat_model.py      |  4 ++++
 .../flink_agents/runtime/java/java_resource_wrapper.py   |  4 ++++
 python/flink_agents/runtime/local_runner.py              | 10 +++++++++-
 .../flink/agents/runtime/context/RunnerContextImpl.java  |  2 ++
 .../agents/runtime/operator/ActionExecutionOperator.java |  7 +++++++
 .../runtime/python/utils/PythonActionExecutor.java       | 11 +++++++++++
 23 files changed, 159 insertions(+), 3 deletions(-)

diff --git 
a/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnection.java
 
b/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnection.java
index 7cae38bd..42018f6d 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnection.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnection.java
@@ -87,4 +87,9 @@ public class PythonChatModelConnection extends 
BaseChatModelConnection
         Object pythonMessageResponse = adapter.callMethod(chatModel, "chat", 
kwargs);
         return adapter.fromPythonChatMessage(pythonMessageResponse);
     }
+
+    @Override
+    public void close() throws Exception {
+        this.chatModel.close();
+    }
 }
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/embedding/model/python/PythonEmbeddingModelConnection.java
 
b/api/src/main/java/org/apache/flink/agents/api/embedding/model/python/PythonEmbeddingModelConnection.java
index b4ff933a..f91896d1 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/embedding/model/python/PythonEmbeddingModelConnection.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/embedding/model/python/PythonEmbeddingModelConnection.java
@@ -124,4 +124,9 @@ public class PythonEmbeddingModelConnection extends 
BaseEmbeddingModelConnection
     public Object getPythonResource() {
         return embeddingModel;
     }
+
+    @Override
+    public void close() throws Exception {
+        this.embeddingModel.close();
+    }
 }
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/resource/Resource.java 
b/api/src/main/java/org/apache/flink/agents/api/resource/Resource.java
index 78b15ad0..d386ca6d 100644
--- a/api/src/main/java/org/apache/flink/agents/api/resource/Resource.java
+++ b/api/src/main/java/org/apache/flink/agents/api/resource/Resource.java
@@ -64,4 +64,7 @@ public abstract class Resource {
     protected FlinkAgentsMetricGroup getMetricGroup() {
         return metricGroup;
     }
+
+    /** Close the resource. */
+    public void close() throws Exception {}
 }
diff --git 
a/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelConnection.java
 
b/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelConnection.java
index 49fbef3e..6dded957 100644
--- 
a/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelConnection.java
+++ 
b/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelConnection.java
@@ -111,6 +111,11 @@ public class AnthropicChatModelConnection extends 
BaseChatModelConnection {
         this.client = builder.build();
     }
 
+    @Override
+    public void close() {
+        this.client.close();
+    }
+
     @Override
     public ChatMessage chat(
             List<ChatMessage> messages,
diff --git 
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
 
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
index 2675d424..b04cd2b2 100644
--- 
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
+++ 
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
@@ -451,4 +451,9 @@ public class OpenAIChatModelConnection extends 
BaseChatModelConnection {
         }
         return mapper.convertValue(value, MAP_TYPE);
     }
+
+    @Override
+    public void close() throws Exception {
+        this.client.close();
+    }
 }
diff --git 
a/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPPrompt.java
 
b/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPPrompt.java
index 8e5c6828..5fd9085a 100644
--- 
a/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPPrompt.java
+++ 
b/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPPrompt.java
@@ -227,4 +227,9 @@ public class MCPPrompt extends Prompt {
     public String toString() {
         return String.format("MCPPrompt{name='%s', server='%s'}", name, 
mcpServer.getEndpoint());
     }
+
+    @Override
+    public void close() throws Exception {
+        this.mcpServer.close();
+    }
 }
diff --git 
a/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPTool.java
 
b/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPTool.java
index 0ad179ae..286fb2d4 100644
--- 
a/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPTool.java
+++ 
b/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPTool.java
@@ -125,4 +125,9 @@ public class MCPTool extends Tool {
         return String.format(
                 "MCPTool{name='%s', server='%s'}", metadata.getName(), 
mcpServer.getEndpoint());
     }
+
+    @Override
+    public void close() throws Exception {
+        this.mcpServer.close();
+    }
 }
diff --git 
a/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java
 
b/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java
index cf09d55f..ec620bc6 100644
--- 
a/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java
+++ 
b/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java
@@ -249,6 +249,11 @@ public class ElasticsearchVectorStore extends 
BaseVectorStore
         this.client = new ElasticsearchClient(transport);
     }
 
+    @Override
+    public void close() throws Exception {
+        this.client.close();
+    }
+
     @Override
     public Collection getOrCreateCollection(String name, Map<String, Object> 
metadata)
             throws Exception {
diff --git a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java 
b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
index b27525b8..6526f5a4 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
@@ -212,6 +212,15 @@ public class AgentPlan implements Serializable {
         return config.getConfData();
     }
 
+    public void close() throws Exception {
+        for (Map<String, Resource> resources : resourceCache.values()) {
+            for (Resource resource : resources.values()) {
+                resource.close();
+            }
+        }
+        resourceCache.clear();
+    }
+
     private void writeObject(ObjectOutputStream out) throws IOException {
         String serializedStr = new ObjectMapper().writeValueAsString(this);
         out.writeUTF(serializedStr);
diff --git a/python/flink_agents/api/resource.py 
b/python/flink_agents/api/resource.py
index 090690e0..ff9be303 100644
--- a/python/flink_agents/api/resource.py
+++ b/python/flink_agents/api/resource.py
@@ -90,6 +90,9 @@ class Resource(BaseModel, ABC):
         """
         return self._metric_group
 
+    def close(self) -> None:
+        """Close the resource."""
+
 
 class SerializableResource(Resource, ABC):
     """Resource which is serializable."""
diff --git a/python/flink_agents/api/runner_context.py 
b/python/flink_agents/api/runner_context.py
index 43d115a4..5461e7fc 100644
--- a/python/flink_agents/api/runner_context.py
+++ b/python/flink_agents/api/runner_context.py
@@ -231,3 +231,7 @@ class RunnerContext(ABC):
         ReadableConfiguration
             The configuration for flink agents.
         """
+
+    @abstractmethod
+    def close(self) -> None:
+        """Clean up the resources."""
diff --git 
a/python/flink_agents/integrations/chat_models/anthropic/anthropic_chat_model.py
 
b/python/flink_agents/integrations/chat_models/anthropic/anthropic_chat_model.py
index 297fb5e7..e0d1233a 100644
--- 
a/python/flink_agents/integrations/chat_models/anthropic/anthropic_chat_model.py
+++ 
b/python/flink_agents/integrations/chat_models/anthropic/anthropic_chat_model.py
@@ -22,6 +22,7 @@ from anthropic import Anthropic
 from anthropic._types import NOT_GIVEN
 from anthropic.types import MessageParam, TextBlockParam, ToolParam
 from pydantic import Field, PrivateAttr
+from typing_extensions import override
 
 from flink_agents.api.chat_message import ChatMessage, MessageRole
 from flink_agents.api.chat_models.chat_model import (
@@ -208,6 +209,14 @@ class 
AnthropicChatModelConnection(BaseChatModelConnection):
                 content=message.content[0].text,
             )
 
+    @override
+    def close(self) -> None:
+        if self._client is not None:
+            try:
+                self._client.close()
+            finally:
+                self._client = None
+
 
 DEFAULT_ANTHROPIC_MODEL = "claude-sonnet-4-20250514"
 DEFAULT_MAX_TOKENS = 1024
diff --git 
a/python/flink_agents/integrations/chat_models/openai/openai_chat_model.py 
b/python/flink_agents/integrations/chat_models/openai/openai_chat_model.py
index a00bbaea..b5e3297a 100644
--- a/python/flink_agents/integrations/chat_models/openai/openai_chat_model.py
+++ b/python/flink_agents/integrations/chat_models/openai/openai_chat_model.py
@@ -20,6 +20,7 @@ from typing import Any, Dict, List, Literal, Sequence
 import httpx
 from openai import NOT_GIVEN, OpenAI
 from pydantic import Field, PrivateAttr
+from typing_extensions import override
 
 from flink_agents.api.chat_message import ChatMessage
 from flink_agents.api.chat_models.chat_model import (
@@ -184,6 +185,14 @@ class OpenAIChatModelConnection(BaseChatModelConnection):
 
         return convert_from_openai_message(message)
 
+    @override
+    def close(self) -> None:
+        if self._client is not None:
+            try:
+                self._client.close()
+            finally:
+                self._client = None
+
 
 DEFAULT_TEMPERATURE = 0.1
 
diff --git 
a/python/flink_agents/integrations/embedding_models/openai_embedding_model.py 
b/python/flink_agents/integrations/embedding_models/openai_embedding_model.py
index 73cf65ff..234ad8d1 100644
--- 
a/python/flink_agents/integrations/embedding_models/openai_embedding_model.py
+++ 
b/python/flink_agents/integrations/embedding_models/openai_embedding_model.py
@@ -19,6 +19,7 @@ from typing import Any, Dict, Sequence
 
 from openai import NOT_GIVEN, OpenAI
 from pydantic import Field
+from typing_extensions import override
 
 from flink_agents.api.embedding_models.embedding_model import (
     BaseEmbeddingModelConnection,
@@ -94,7 +95,7 @@ class 
OpenAIEmbeddingModelConnection(BaseEmbeddingModelConnection):
             **kwargs,
         )
 
-    __client: OpenAI = None
+    __client: OpenAI | None = None
 
     @property
     def client(self) -> OpenAI:
@@ -130,6 +131,15 @@ class 
OpenAIEmbeddingModelConnection(BaseEmbeddingModelConnection):
         embeddings = [list(embedding.embedding) for embedding in response.data]
         return embeddings[0] if isinstance(text, str) else embeddings
 
+    @override
+    def close(self) -> None:
+        """Do nothing."""
+        if self.__client is not None:
+            try:
+                self.__client.close()
+            finally:
+                self.__client = None
+
 
 class OpenAIEmbeddingModelSetup(BaseEmbeddingModelSetup):
     """The settings for OpenAI embedding model.
diff --git a/python/flink_agents/integrations/mcp/mcp.py 
b/python/flink_agents/integrations/mcp/mcp.py
index 32ac4059..f430cbe0 100644
--- a/python/flink_agents/integrations/mcp/mcp.py
+++ b/python/flink_agents/integrations/mcp/mcp.py
@@ -67,6 +67,14 @@ class MCPTool(Tool):
             self.mcp_server.call_tool_async(self.metadata.name, *args, 
**kwargs)
         )
 
+    @override
+    def close(self) -> None:
+        if self.mcp_server is not None:
+            try:
+                self.mcp_server.close()
+            finally:
+                self.mcp_server = None
+
 
 class MCPPrompt(Prompt):
     """MCP prompt definition that extends the base Prompt class.
@@ -117,6 +125,13 @@ class MCPPrompt(Prompt):
         arguments = self._check_arguments(**arguments)
         return self.mcp_server.get_prompt(self.name, arguments)
 
+    @override
+    def close(self) -> None:
+        if self.mcp_server is not None:
+            try:
+                self.mcp_server.close()
+            finally:
+                self.mcp_server = None
 
 class MCPServer(SerializableResource, ABC):
     """Resource representing an MCP server and exposing its tools/prompts.
@@ -308,6 +323,7 @@ class MCPServer(SerializableResource, ABC):
 
         return chat_messages
 
+    @override
     def close(self) -> None:
         """Close the MCP server connection and clean up resources."""
         asyncio.run(self._cleanup_connection())
diff --git a/python/flink_agents/plan/agent_plan.py 
b/python/flink_agents/plan/agent_plan.py
index 98de608d..b9180ca1 100644
--- a/python/flink_agents/plan/agent_plan.py
+++ b/python/flink_agents/plan/agent_plan.py
@@ -223,6 +223,14 @@ class AgentPlan(BaseModel):
         """Set java resource adapter for java resource provider."""
         self.__j_resource_adapter = j_resource_adapter
 
+    def close(self) -> None:
+        """Clean up the resources."""
+        for type in self.__resources:
+            for name in self.__resources[type]:
+                self.__resources[type][name].close()
+        self.__resources.clear()
+
+
 
 def _get_actions(agent: Agent) -> List[Action]:
     """Extract all registered agent actions from an agent.
diff --git a/python/flink_agents/runtime/flink_runner_context.py 
b/python/flink_agents/runtime/flink_runner_context.py
index 7c89ef1c..d9ac02a4 100644
--- a/python/flink_agents/runtime/flink_runner_context.py
+++ b/python/flink_agents/runtime/flink_runner_context.py
@@ -46,7 +46,7 @@ class FlinkRunnerContext(RunnerContext):
     This context allows access to event handling.
     """
 
-    __agent_plan: AgentPlan
+    __agent_plan: AgentPlan | None
     __ltm: BaseLongTermMemory = None
 
     def __init__(
@@ -209,6 +209,14 @@ class FlinkRunnerContext(RunnerContext):
         """
         return self.__agent_plan.config
 
+    @override
+    def close(self) -> None:
+        if self.__agent_plan is not None:
+            try:
+                self.__agent_plan.close()
+            finally:
+                self.__agent_plan = None
+
 
 def create_flink_runner_context(
     j_runner_context: Any,
@@ -247,6 +255,12 @@ def flink_runner_context_switch_action_context(
             )
         )
 
+def close_flink_runner_context(
+    ctx: FlinkRunnerContext,
+) -> None:
+    """Clean up the resources kept by the flink runner context."""
+    ctx.close()
+
 
 def create_async_thread_pool() -> ThreadPoolExecutor:
     """Used to create a thread pool to execute asynchronous
diff --git a/python/flink_agents/runtime/java/java_chat_model.py 
b/python/flink_agents/runtime/java/java_chat_model.py
index 1d432246..2263b68e 100644
--- a/python/flink_agents/runtime/java/java_chat_model.py
+++ b/python/flink_agents/runtime/java/java_chat_model.py
@@ -81,6 +81,10 @@ class JavaChatModelConnectionImpl(JavaChatModelConnection):
 
         return from_java_chat_message(j_response_message)
 
+    @override
+    def close(self) -> None:
+        self.j_resource.close()
+
 
 class JavaChatModelSetupImpl(JavaChatModelSetup):
     """Java-based implementation of ChatModelSetup that bridges Python and 
Java chat
diff --git a/python/flink_agents/runtime/java/java_resource_wrapper.py 
b/python/flink_agents/runtime/java/java_resource_wrapper.py
index 496aab33..56a101ca 100644
--- a/python/flink_agents/runtime/java/java_resource_wrapper.py
+++ b/python/flink_agents/runtime/java/java_resource_wrapper.py
@@ -62,6 +62,10 @@ class JavaPrompt(Prompt):
                                             
extra_args=j_chat_message.getExtraArgs()) for j_chat_message in j_chat_messages]
         return chatMessages
 
+    @override
+    def close(self) -> None:
+        self.j_prompt.close()
+
 class JavaGetResourceWrapper:
     """Python wrapper for Java ResourceAdapter."""
 
diff --git a/python/flink_agents/runtime/local_runner.py 
b/python/flink_agents/runtime/local_runner.py
index 084f7bd0..5ed6157d 100644
--- a/python/flink_agents/runtime/local_runner.py
+++ b/python/flink_agents/runtime/local_runner.py
@@ -57,7 +57,7 @@ class LocalRunnerContext(RunnerContext):
         Name of the action being executed.
     """
 
-    __agent_plan: AgentPlan
+    __agent_plan: AgentPlan | None
     __key: Any
     events: deque[Event]
     action_name: str
@@ -221,6 +221,14 @@ class LocalRunnerContext(RunnerContext):
         """Clean up sensory memory."""
         self._sensory_mem_store.clear()
 
+    def close(self) -> None:
+        """Cleanup the resource."""
+        if self.__agent_plan is not None:
+            try:
+                self.__agent_plan.close()
+            finally:
+                self.__agent_plan = None
+
 
 class LocalRunner(AgentRunner):
     """Agent runner implementation for local execution, which is
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
index 549359f3..2b8d8dc5 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
@@ -231,6 +231,8 @@ public class RunnerContextImpl implements RunnerContext {
             this.ltm.close();
             this.ltm = null;
         }
+
+        this.agentPlan.close();
     }
 
     public String getActionName() {
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 70a49c0e..1b569ac8 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -643,6 +643,13 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
 
     @Override
     public void close() throws Exception {
+        if (runnerContext != null) {
+            try {
+                runnerContext.close();
+            } finally {
+                runnerContext = null;
+            }
+        }
         if (pythonActionExecutor != null) {
             pythonActionExecutor.close();
         }
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
index 4d7792aa..3f9ad702 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
@@ -43,6 +43,9 @@ public class PythonActionExecutor {
     private static final String FLINK_RUNNER_CONTEXT_SWITCH_ACTION_CONTEXT =
             "flink_runner_context.flink_runner_context_switch_action_context";
 
+    private static final String CLOSE_FLINK_RUNNER_CONTEXT =
+            "flink_runner_context.close_flink_runner_context";
+
     // ========== ASYNC THREAD POOL ===========
     private static final String CREATE_ASYNC_THREAD_POOL =
             "flink_runner_context.create_async_thread_pool";
@@ -176,6 +179,14 @@ public class PythonActionExecutor {
             if (pythonAsyncThreadPool != null) {
                 interpreter.invoke(CLOSE_ASYNC_THREAD_POOL, 
pythonAsyncThreadPool);
             }
+
+            if (pythonRunnerContext != null) {
+                try {
+                    interpreter.invoke(CLOSE_FLINK_RUNNER_CONTEXT, 
pythonRunnerContext);
+                } finally {
+                    pythonRunnerContext = null;
+                }
+            }
         }
     }
 

Reply via email to