This is an automated email from the ASF dual-hosted git repository.

xintongsong pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new 24931d33 [python][plan] Make AgentPlan.getResource thread-safe 
(backport #548 to release-0.2) (#727)
24931d33 is described below

commit 24931d3373199935cc61ccc34ac7a29c21a31749
Author: Weiqing Yang <[email protected]>
AuthorDate: Wed Jun 3 01:55:18 2026 -0700

    [python][plan] Make AgentPlan.getResource thread-safe (backport #548 to 
release-0.2) (#727)
---
 .../org/apache/flink/agents/plan/AgentPlan.java    |  7 ++-
 .../apache/flink/agents/plan/AgentPlanTest.java    | 55 ++++++++++++++++++++++
 python/flink_agents/plan/agent_plan.py             | 35 +++++++++-----
 python/flink_agents/plan/tests/test_agent_plan.py  | 46 +++++++++++++++++-
 4 files changed, 130 insertions(+), 13 deletions(-)

diff --git a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java 
b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
index ea4a4258..451e2432 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
@@ -223,12 +223,17 @@ public class AgentPlan implements Serializable {
     /**
      * Get resource from agent plan.
      *
+     * <p>Synchronized so the check-create-cache sequence is atomic: 
concurrent first accesses to
+     * the same uncached resource invoke the provider once and share a single 
instance. The
+     * intrinsic lock is reentrant, so the nested resolution callback below 
can call back into this
+     * method on the same thread.
+     *
      * @param name the resource name
      * @param type the resource type
      * @return the resource instance
      * @throws Exception if the resource cannot be found or created
      */
-    public Resource getResource(String name, ResourceType type) throws 
Exception {
+    public synchronized Resource getResource(String name, ResourceType type) 
throws Exception {
         // Check cache first
         if (resourceCache.containsKey(type) && 
resourceCache.get(type).containsKey(name)) {
             return resourceCache.get(type).get(name);
diff --git a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java 
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
index 74ddda6b..81a55eb9 100644
--- a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
+++ b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
@@ -46,8 +46,16 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import pemja.core.object.PyObject;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -480,6 +488,53 @@ public class AgentPlanTest {
         assertThat(myTool).isSameAs(myToolAgain);
     }
 
+    @Test
+    void getResourceShouldCreateOnlyOneInstanceUnderConcurrentAccess() throws 
Exception {
+        AtomicInteger created = new AtomicInteger();
+
+        ResourceProvider provider =
+                new ResourceProvider("shared-tool", ResourceType.TOOL) {
+                    @Override
+                    public Resource provide(BiFunction<String, ResourceType, 
Resource> getResource)
+                            throws Exception {
+                        // Sleep so both callers enter getResource before 
either caches.
+                        Thread.sleep(200);
+                        created.incrementAndGet();
+                        return new Resource() {
+                            @Override
+                            public ResourceType getResourceType() {
+                                return ResourceType.TOOL;
+                            }
+                        };
+                    }
+                };
+
+        Map<ResourceType, Map<String, ResourceProvider>> providers = new 
HashMap<>();
+        providers.put(ResourceType.TOOL, Map.of("shared-tool", provider));
+        AgentPlan plan = new AgentPlan(new HashMap<>(), new HashMap<>(), 
providers);
+
+        CyclicBarrier start = new CyclicBarrier(2);
+        ExecutorService pool = Executors.newFixedThreadPool(2);
+        try {
+            Callable<Resource> task =
+                    () -> {
+                        start.await();
+                        return plan.getResource("shared-tool", 
ResourceType.TOOL);
+                    };
+
+            Future<Resource> first = pool.submit(task);
+            Future<Resource> second = pool.submit(task);
+
+            Resource firstResource = first.get(10, TimeUnit.SECONDS);
+            Resource secondResource = second.get(10, TimeUnit.SECONDS);
+
+            assertThat(firstResource).isSameAs(secondResource);
+            assertThat(created.get()).isEqualTo(1);
+        } finally {
+            pool.shutdownNow();
+        }
+    }
+
     @Test
     public void testAddResourceRegistersEmbeddingModelProvider() throws 
Exception {
         Agent agent = new Agent();
diff --git a/python/flink_agents/plan/agent_plan.py 
b/python/flink_agents/plan/agent_plan.py
index daf66128..3e717e39 100644
--- a/python/flink_agents/plan/agent_plan.py
+++ b/python/flink_agents/plan/agent_plan.py
@@ -15,6 +15,8 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
+import threading
+from functools import cached_property
 from typing import TYPE_CHECKING, Any, Dict, List, cast
 
 from pydantic import BaseModel, field_serializer, model_validator
@@ -62,6 +64,14 @@ class AgentPlan(BaseModel):
     __resources: Dict[ResourceType, Dict[str, Resource]] = {}
     __j_resource_adapter: Any = None
 
+    @cached_property
+    def _resource_lock(self) -> threading.RLock:
+        # Guards the lazy check-create-cache in get_resource so concurrent 
first
+        # accesses to the same uncached resource create a single instance.
+        # Reentrant because resource resolution can call back into get_resource
+        # on the same thread (e.g. a chat model setup resolving its 
connection).
+        return threading.RLock()
+
     @field_serializer("resource_providers")
     def __serialize_resource_providers(
         self, providers: Dict[ResourceType, Dict[str, ResourceProvider]]
@@ -209,17 +219,20 @@ class AgentPlan(BaseModel):
         type : ResourceType
             The type of the resource.
         """
-        if type not in self.__resources:
-            self.__resources[type] = {}
-        if name not in self.__resources[type]:
-            resource_provider = self.resource_providers[type][name]
-            if isinstance(resource_provider, JavaResourceProvider):
-                
resource_provider.set_java_resource_adapter(self.__j_resource_adapter)
-            resource = resource_provider.provide(
-                get_resource=self.get_resource, config=self.config
-            )
-            self.__resources[type][name] = resource
-        return self.__resources[type][name]
+        with self._resource_lock:
+            if type not in self.__resources:
+                self.__resources[type] = {}
+            if name not in self.__resources[type]:
+                resource_provider = self.resource_providers[type][name]
+                if isinstance(resource_provider, JavaResourceProvider):
+                    resource_provider.set_java_resource_adapter(
+                        self.__j_resource_adapter
+                    )
+                resource = resource_provider.provide(
+                    get_resource=self.get_resource, config=self.config
+                )
+                self.__resources[type][name] = resource
+            return self.__resources[type][name]
 
     def set_java_resource_adapter(self, j_resource_adapter: Any) -> None:
         """Set java resource adapter for java resource provider."""
diff --git a/python/flink_agents/plan/tests/test_agent_plan.py 
b/python/flink_agents/plan/tests/test_agent_plan.py
index 6050c485..0abc8b1b 100644
--- a/python/flink_agents/plan/tests/test_agent_plan.py
+++ b/python/flink_agents/plan/tests/test_agent_plan.py
@@ -16,6 +16,8 @@
 # limitations under the License.
 
#################################################################################
 import json
+import threading
+import time
 from pathlib import Path
 from typing import Any, Dict, List, Sequence
 
@@ -36,7 +38,7 @@ from flink_agents.api.embedding_models.embedding_model import 
(
     BaseEmbeddingModelSetup,
 )
 from flink_agents.api.events.event import Event, InputEvent, OutputEvent
-from flink_agents.api.resource import ResourceDescriptor, ResourceType
+from flink_agents.api.resource import Resource, ResourceDescriptor, 
ResourceType
 from flink_agents.api.runner_context import RunnerContext
 from flink_agents.api.vector_stores.vector_store import (
     BaseVectorStore,
@@ -45,6 +47,7 @@ from flink_agents.api.vector_stores.vector_store import (
 from flink_agents.plan.agent_plan import AgentPlan
 from flink_agents.plan.configuration import AgentConfiguration
 from flink_agents.plan.function import PythonFunction
+from flink_agents.plan.resource_provider import ResourceProvider
 
 
 class AgentForTest(Agent):  # noqa D101
@@ -262,6 +265,47 @@ def test_get_resource() -> None:  # noqa: D103
     )
 
 
+def test_get_resource_single_instance_under_concurrent_access() -> None:
+    """Concurrent first access to an uncached resource must create one 
instance."""
+    created: List[int] = []
+
+    class _DummyResource(Resource):
+        @classmethod
+        def resource_type(cls) -> ResourceType:
+            return ResourceType.TOOL
+
+    class _SlowCountingProvider(ResourceProvider):
+        def provide(self, get_resource: Any, config: Any) -> Resource:
+            # Widen the race window so both callers enter provide() before
+            # either caches the result.
+            time.sleep(0.2)
+            created.append(1)
+            return _DummyResource()
+
+    provider = _SlowCountingProvider(name="shared-tool", 
type=ResourceType.TOOL)
+    agent_plan = AgentPlan(
+        actions={},
+        actions_by_event={},
+        resource_providers={ResourceType.TOOL: {"shared-tool": provider}},
+    )
+
+    barrier = threading.Barrier(2)
+    results: List[Resource] = []
+
+    def task() -> None:
+        barrier.wait()
+        results.append(agent_plan.get_resource("shared-tool", 
ResourceType.TOOL))
+
+    threads = [threading.Thread(target=task) for _ in range(2)]
+    for thread in threads:
+        thread.start()
+    for thread in threads:
+        thread.join(timeout=10)
+
+    assert len(created) == 1
+    assert results[0] is results[1]
+
+
 def test_add_action_and_resource_to_agent() -> None:  # noqa: D103
     my_agent = Agent()
     my_agent.add_action(

Reply via email to