joeyutong opened a new issue, #724: URL: https://github.com/apache/flink-agents/issues/724
### Search before asking - [x] I searched in the [issues](https://github.com/apache/flink-agents/issues) and found nothing similar. ### Description On the `release-0.2` branch, resource resolution is cached in `AgentPlan`, but `AgentPlan#getResource(String, ResourceType)` is not thread-safe. The method currently performs a multi-step check/create/cache sequence without a shared lock: 1. check `resourceCache` for an existing resource 2. call `ResourceProvider#provide(...)` if missing 3. put the new resource into `resourceCache` Because the check and put are not atomic, two concurrent first accesses to the same resource can both pass the cache check, both create a resource instance, and then race when storing the result. That violates the expected cached-resource semantics. `AgentPlanTest#testGetResourceFromResourceProvider` already asserts that repeated `getResource(...)` calls should return the same resource instance, but the current implementation only guarantees that for sequential calls. This can be reached from the public runtime path on JDK 21: - `JavaActionTask` executes Java actions through `ContinuationActionExecutor`. - `JavaRunnerContextImpl#durableExecuteAsync(...)` delegates to `ContinuationActionExecutor#executeAsync(...)` when a continuation context is present. - The JDK 21 `ContinuationActionExecutor` implementation submits the callable to a fixed async thread pool. - If the callable resolves a resource through `RunnerContext#getResource(...)`, `RunnerContextImpl#getResource(...)` delegates to `agentPlan.getResource(...)` on that async thread. Potential symptoms include duplicate model/tool/connection initialization and duplicate registration of resource-side state such as metrics when two instances of the same logical resource are created concurrently. The issue appears to be incidentally covered on `main` by later refactoring commits rather than by a dedicated bug-fix commit: - https://github.com/apache/flink-agents/commit/f8eac10b870ad62ab1799fe9d7d35cc25d40185c (`[plan] Extract ResourceCache and PythonResourceBridge from AgentPlan (#548)`) moved runtime resource caching out of `AgentPlan` into `ResourceCache` and introduced `public synchronized Resource getResource(...)`. - https://github.com/apache/flink-agents/commit/487bd5844229f9b6712b56fac58a21a8026db62a (`[runtime] Own ResourceContext inside ResourceCache for shared lifecycle.`) later adjusted the same cache/lifecycle area while keeping the synchronized resource resolution path. ### How to reproduce A small regression test on `release-0.2` can reproduce the race by resolving the same uncached resource from two threads. The provider sleep makes both callers enter `AgentPlan#getResource(...)` before either one stores the cache entry. ```java @Test void getResourceShouldCreateOnlyOneInstanceUnderConcurrentAccess() throws Exception { AtomicInteger created = new AtomicInteger(); ResourceProvider provider = new ResourceProvider("shared-tool", ResourceType.TOOL) { @Override public Resource provide(BiFunction<String, ResourceType, Resource> getResource) throws Exception { Thread.sleep(200); created.incrementAndGet(); return new Resource() { @Override public ResourceType getResourceType() { return ResourceType.TOOL; } }; } }; Map<ResourceType, Map<String, ResourceProvider>> providers = new HashMap<>(); providers.put(ResourceType.TOOL, Map.of("shared-tool", provider)); AgentPlan plan = new AgentPlan(new HashMap<>(), new HashMap<>(), providers); CyclicBarrier start = new CyclicBarrier(2); ExecutorService pool = Executors.newFixedThreadPool(2); try { Callable<Resource> task = () -> { start.await(); return plan.getResource("shared-tool", ResourceType.TOOL); }; Future<Resource> first = pool.submit(task); Future<Resource> second = pool.submit(task); Resource firstResource = first.get(10, TimeUnit.SECONDS); Resource secondResource = second.get(10, TimeUnit.SECONDS); assertThat(firstResource).isSameAs(secondResource); assertThat(created.get()).isEqualTo(1); } finally { pool.shutdownNow(); } } ``` Expected behavior: the provider is invoked once and both callers receive the same cached resource. Actual behavior on `release-0.2`: both callers can invoke the provider and receive different resource instances before the last writer wins in the cache. ### Version and environment - Apache Flink Agents branch: `release-0.2` - Verified against `release-0.2` at commit `1df9a1e503b9c062bbe1267d2d3c277ed676c686` - Relevant runtime path: Java action execution with JDK 21 async continuation support - No external model service is required to reproduce the core cache race; the issue can be reproduced with an in-memory `ResourceProvider` as shown above. ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
