joeyutong opened a new issue, #724:
URL: https://github.com/apache/flink-agents/issues/724

   ### Search before asking
   
   - [x] I searched in the 
[issues](https://github.com/apache/flink-agents/issues) and found nothing 
similar.
   
   ### Description
   
   On the `release-0.2` branch, resource resolution is cached in `AgentPlan`, 
but `AgentPlan#getResource(String, ResourceType)` is not thread-safe.
   
   The method currently performs a multi-step check/create/cache sequence 
without a shared lock:
   
   1. check `resourceCache` for an existing resource
   2. call `ResourceProvider#provide(...)` if missing
   3. put the new resource into `resourceCache`
   
   Because the check and put are not atomic, two concurrent first accesses to 
the same resource can both pass the cache check, both create a resource 
instance, and then race when storing the result. That violates the expected 
cached-resource semantics. `AgentPlanTest#testGetResourceFromResourceProvider` 
already asserts that repeated `getResource(...)` calls should return the same 
resource instance, but the current implementation only guarantees that for 
sequential calls.
   
   This can be reached from the public runtime path on JDK 21:
   
   - `JavaActionTask` executes Java actions through 
`ContinuationActionExecutor`.
   - `JavaRunnerContextImpl#durableExecuteAsync(...)` delegates to 
`ContinuationActionExecutor#executeAsync(...)` when a continuation context is 
present.
   - The JDK 21 `ContinuationActionExecutor` implementation submits the 
callable to a fixed async thread pool.
   - If the callable resolves a resource through 
`RunnerContext#getResource(...)`, `RunnerContextImpl#getResource(...)` 
delegates to `agentPlan.getResource(...)` on that async thread.
   
   Potential symptoms include duplicate model/tool/connection initialization 
and duplicate registration of resource-side state such as metrics when two 
instances of the same logical resource are created concurrently.
   
   The issue appears to be incidentally covered on `main` by later refactoring 
commits rather than by a dedicated bug-fix commit:
   
   - 
https://github.com/apache/flink-agents/commit/f8eac10b870ad62ab1799fe9d7d35cc25d40185c
 (`[plan] Extract ResourceCache and PythonResourceBridge from AgentPlan 
(#548)`) moved runtime resource caching out of `AgentPlan` into `ResourceCache` 
and introduced `public synchronized Resource getResource(...)`.
   - 
https://github.com/apache/flink-agents/commit/487bd5844229f9b6712b56fac58a21a8026db62a
 (`[runtime] Own ResourceContext inside ResourceCache for shared lifecycle.`) 
later adjusted the same cache/lifecycle area while keeping the synchronized 
resource resolution path.
   
   ### How to reproduce
   
   A small regression test on `release-0.2` can reproduce the race by resolving 
the same uncached resource from two threads. The provider sleep makes both 
callers enter `AgentPlan#getResource(...)` before either one stores the cache 
entry.
   
   ```java
   @Test
   void getResourceShouldCreateOnlyOneInstanceUnderConcurrentAccess() throws 
Exception {
       AtomicInteger created = new AtomicInteger();
   
       ResourceProvider provider =
               new ResourceProvider("shared-tool", ResourceType.TOOL) {
                   @Override
                   public Resource provide(BiFunction<String, ResourceType, 
Resource> getResource)
                           throws Exception {
                       Thread.sleep(200);
                       created.incrementAndGet();
                       return new Resource() {
                           @Override
                           public ResourceType getResourceType() {
                               return ResourceType.TOOL;
                           }
                       };
                   }
               };
   
       Map<ResourceType, Map<String, ResourceProvider>> providers = new 
HashMap<>();
       providers.put(ResourceType.TOOL, Map.of("shared-tool", provider));
       AgentPlan plan = new AgentPlan(new HashMap<>(), new HashMap<>(), 
providers);
   
       CyclicBarrier start = new CyclicBarrier(2);
       ExecutorService pool = Executors.newFixedThreadPool(2);
       try {
           Callable<Resource> task =
                   () -> {
                       start.await();
                       return plan.getResource("shared-tool", 
ResourceType.TOOL);
                   };
   
           Future<Resource> first = pool.submit(task);
           Future<Resource> second = pool.submit(task);
   
           Resource firstResource = first.get(10, TimeUnit.SECONDS);
           Resource secondResource = second.get(10, TimeUnit.SECONDS);
   
           assertThat(firstResource).isSameAs(secondResource);
           assertThat(created.get()).isEqualTo(1);
       } finally {
           pool.shutdownNow();
       }
   }
   ```
   
   Expected behavior: the provider is invoked once and both callers receive the 
same cached resource.
   
   Actual behavior on `release-0.2`: both callers can invoke the provider and 
receive different resource instances before the last writer wins in the cache.
   
   ### Version and environment
   
   - Apache Flink Agents branch: `release-0.2`
   - Verified against `release-0.2` at commit 
`1df9a1e503b9c062bbe1267d2d3c277ed676c686`
   - Relevant runtime path: Java action execution with JDK 21 async 
continuation support
   - No external model service is required to reproduce the core cache race; 
the issue can be reproduced with an in-memory `ResourceProvider` as shown above.
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to