This is an automated email from the ASF dual-hosted git repository.
xintongsong pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/release-0.2 by this push:
new 24931d33 [python][plan] Make AgentPlan.getResource thread-safe
(backport #548 to release-0.2) (#727)
24931d33 is described below
commit 24931d3373199935cc61ccc34ac7a29c21a31749
Author: Weiqing Yang <[email protected]>
AuthorDate: Wed Jun 3 01:55:18 2026 -0700
[python][plan] Make AgentPlan.getResource thread-safe (backport #548 to
release-0.2) (#727)
---
.../org/apache/flink/agents/plan/AgentPlan.java | 7 ++-
.../apache/flink/agents/plan/AgentPlanTest.java | 55 ++++++++++++++++++++++
python/flink_agents/plan/agent_plan.py | 35 +++++++++-----
python/flink_agents/plan/tests/test_agent_plan.py | 46 +++++++++++++++++-
4 files changed, 130 insertions(+), 13 deletions(-)
diff --git a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
index ea4a4258..451e2432 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
@@ -223,12 +223,17 @@ public class AgentPlan implements Serializable {
/**
* Get resource from agent plan.
*
+ * <p>Synchronized so the check-create-cache sequence is atomic:
concurrent first accesses to
+ * the same uncached resource invoke the provider once and share a single
instance. The
+ * intrinsic lock is reentrant, so the nested resolution callback below
can call back into this
+ * method on the same thread.
+ *
* @param name the resource name
* @param type the resource type
* @return the resource instance
* @throws Exception if the resource cannot be found or created
*/
- public Resource getResource(String name, ResourceType type) throws
Exception {
+ public synchronized Resource getResource(String name, ResourceType type)
throws Exception {
// Check cache first
if (resourceCache.containsKey(type) &&
resourceCache.get(type).containsKey(name)) {
return resourceCache.get(type).get(name);
diff --git a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
index 74ddda6b..81a55eb9 100644
--- a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
+++ b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
@@ -46,8 +46,16 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import pemja.core.object.PyObject;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import static org.assertj.core.api.Assertions.assertThat;
@@ -480,6 +488,53 @@ public class AgentPlanTest {
assertThat(myTool).isSameAs(myToolAgain);
}
+ @Test
+ void getResourceShouldCreateOnlyOneInstanceUnderConcurrentAccess() throws
Exception {
+ AtomicInteger created = new AtomicInteger();
+
+ ResourceProvider provider =
+ new ResourceProvider("shared-tool", ResourceType.TOOL) {
+ @Override
+ public Resource provide(BiFunction<String, ResourceType,
Resource> getResource)
+ throws Exception {
+ // Sleep so both callers enter getResource before
either caches.
+ Thread.sleep(200);
+ created.incrementAndGet();
+ return new Resource() {
+ @Override
+ public ResourceType getResourceType() {
+ return ResourceType.TOOL;
+ }
+ };
+ }
+ };
+
+ Map<ResourceType, Map<String, ResourceProvider>> providers = new
HashMap<>();
+ providers.put(ResourceType.TOOL, Map.of("shared-tool", provider));
+ AgentPlan plan = new AgentPlan(new HashMap<>(), new HashMap<>(),
providers);
+
+ CyclicBarrier start = new CyclicBarrier(2);
+ ExecutorService pool = Executors.newFixedThreadPool(2);
+ try {
+ Callable<Resource> task =
+ () -> {
+ start.await();
+ return plan.getResource("shared-tool",
ResourceType.TOOL);
+ };
+
+ Future<Resource> first = pool.submit(task);
+ Future<Resource> second = pool.submit(task);
+
+ Resource firstResource = first.get(10, TimeUnit.SECONDS);
+ Resource secondResource = second.get(10, TimeUnit.SECONDS);
+
+ assertThat(firstResource).isSameAs(secondResource);
+ assertThat(created.get()).isEqualTo(1);
+ } finally {
+ pool.shutdownNow();
+ }
+ }
+
@Test
public void testAddResourceRegistersEmbeddingModelProvider() throws
Exception {
Agent agent = new Agent();
diff --git a/python/flink_agents/plan/agent_plan.py
b/python/flink_agents/plan/agent_plan.py
index daf66128..3e717e39 100644
--- a/python/flink_agents/plan/agent_plan.py
+++ b/python/flink_agents/plan/agent_plan.py
@@ -15,6 +15,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
+import threading
+from functools import cached_property
from typing import TYPE_CHECKING, Any, Dict, List, cast
from pydantic import BaseModel, field_serializer, model_validator
@@ -62,6 +64,14 @@ class AgentPlan(BaseModel):
__resources: Dict[ResourceType, Dict[str, Resource]] = {}
__j_resource_adapter: Any = None
+ @cached_property
+ def _resource_lock(self) -> threading.RLock:
+ # Guards the lazy check-create-cache in get_resource so concurrent
first
+ # accesses to the same uncached resource create a single instance.
+ # Reentrant because resource resolution can call back into get_resource
+ # on the same thread (e.g. a chat model setup resolving its
connection).
+ return threading.RLock()
+
@field_serializer("resource_providers")
def __serialize_resource_providers(
self, providers: Dict[ResourceType, Dict[str, ResourceProvider]]
@@ -209,17 +219,20 @@ class AgentPlan(BaseModel):
type : ResourceType
The type of the resource.
"""
- if type not in self.__resources:
- self.__resources[type] = {}
- if name not in self.__resources[type]:
- resource_provider = self.resource_providers[type][name]
- if isinstance(resource_provider, JavaResourceProvider):
-
resource_provider.set_java_resource_adapter(self.__j_resource_adapter)
- resource = resource_provider.provide(
- get_resource=self.get_resource, config=self.config
- )
- self.__resources[type][name] = resource
- return self.__resources[type][name]
+ with self._resource_lock:
+ if type not in self.__resources:
+ self.__resources[type] = {}
+ if name not in self.__resources[type]:
+ resource_provider = self.resource_providers[type][name]
+ if isinstance(resource_provider, JavaResourceProvider):
+ resource_provider.set_java_resource_adapter(
+ self.__j_resource_adapter
+ )
+ resource = resource_provider.provide(
+ get_resource=self.get_resource, config=self.config
+ )
+ self.__resources[type][name] = resource
+ return self.__resources[type][name]
def set_java_resource_adapter(self, j_resource_adapter: Any) -> None:
"""Set java resource adapter for java resource provider."""
diff --git a/python/flink_agents/plan/tests/test_agent_plan.py
b/python/flink_agents/plan/tests/test_agent_plan.py
index 6050c485..0abc8b1b 100644
--- a/python/flink_agents/plan/tests/test_agent_plan.py
+++ b/python/flink_agents/plan/tests/test_agent_plan.py
@@ -16,6 +16,8 @@
# limitations under the License.
#################################################################################
import json
+import threading
+import time
from pathlib import Path
from typing import Any, Dict, List, Sequence
@@ -36,7 +38,7 @@ from flink_agents.api.embedding_models.embedding_model import
(
BaseEmbeddingModelSetup,
)
from flink_agents.api.events.event import Event, InputEvent, OutputEvent
-from flink_agents.api.resource import ResourceDescriptor, ResourceType
+from flink_agents.api.resource import Resource, ResourceDescriptor,
ResourceType
from flink_agents.api.runner_context import RunnerContext
from flink_agents.api.vector_stores.vector_store import (
BaseVectorStore,
@@ -45,6 +47,7 @@ from flink_agents.api.vector_stores.vector_store import (
from flink_agents.plan.agent_plan import AgentPlan
from flink_agents.plan.configuration import AgentConfiguration
from flink_agents.plan.function import PythonFunction
+from flink_agents.plan.resource_provider import ResourceProvider
class AgentForTest(Agent): # noqa D101
@@ -262,6 +265,47 @@ def test_get_resource() -> None: # noqa: D103
)
+def test_get_resource_single_instance_under_concurrent_access() -> None:
+ """Concurrent first access to an uncached resource must create one
instance."""
+ created: List[int] = []
+
+ class _DummyResource(Resource):
+ @classmethod
+ def resource_type(cls) -> ResourceType:
+ return ResourceType.TOOL
+
+ class _SlowCountingProvider(ResourceProvider):
+ def provide(self, get_resource: Any, config: Any) -> Resource:
+ # Widen the race window so both callers enter provide() before
+ # either caches the result.
+ time.sleep(0.2)
+ created.append(1)
+ return _DummyResource()
+
+ provider = _SlowCountingProvider(name="shared-tool",
type=ResourceType.TOOL)
+ agent_plan = AgentPlan(
+ actions={},
+ actions_by_event={},
+ resource_providers={ResourceType.TOOL: {"shared-tool": provider}},
+ )
+
+ barrier = threading.Barrier(2)
+ results: List[Resource] = []
+
+ def task() -> None:
+ barrier.wait()
+ results.append(agent_plan.get_resource("shared-tool",
ResourceType.TOOL))
+
+ threads = [threading.Thread(target=task) for _ in range(2)]
+ for thread in threads:
+ thread.start()
+ for thread in threads:
+ thread.join(timeout=10)
+
+ assert len(created) == 1
+ assert results[0] is results[1]
+
+
def test_add_action_and_resource_to_agent() -> None: # noqa: D103
my_agent = Agent()
my_agent.add_action(