joeyutong opened a new issue, #706:
URL: https://github.com/apache/flink-agents/issues/706

   ### Search before asking
   
   - [x] I searched in the 
[issues](https://github.com/apache/flink-agents/issues) and found nothing 
similar.
   
   ### Description
   
   `FlinkAgentsMetricGroupImpl` has two related thread-safety concerns around 
counters/meters and metric-group access from async execution paths.
   
   1. Java-side metric lookup/registration is not thread-safe.
   
   `FlinkAgentsMetricGroupImpl` lazily caches subgroups, counters, meters, 
histograms, and gauges in plain `HashMap`s. Accessors such as `getSubGroup`, 
`getCounter`, and `getMeter` use a check-then-put pattern without 
synchronization.
   
   This can race on first access to the same metric name. For example, two 
threads can both observe a missing counter and both call `super.counter(name)`. 
Flink's underlying metric group will keep the first registered metric and 
reject/report a name collision for the second one, but `counter(name, counter)` 
still returns the provided counter. If the second counter is stored in the 
`FlinkAgentsMetricGroupImpl` cache, later increments may go to a counter that 
is not reported.
   
   There is also a separate update-safety issue: `super.counter(name)` creates 
Flink's default `SimpleCounter`, which is not thread-safe, and `new 
MeterView(60)` internally uses a `SimpleCounter`. Concurrent `inc()` / 
`markEvent()` calls can lose updates.
   
   2. The context metric-group getter exposes the metric group to async code.
   
   The metric group is exposed through `RunnerContext#getAgentMetricGroup()` / 
`getActionMetricGroup()` and Python `ctx.agent_metric_group` / 
`ctx.action_metric_group`. Some built-in paths record metrics after async calls 
resume, but the public context getter allows user code or resources to capture 
the metric group and use it inside `durableExecuteAsync` / 
`durable_execute_async` callables.
   
   On the Java built-in chat path, token metrics are recorded inside 
`BaseChatModelConnection.recordTokenMetrics(...)`, and `ChatModelAction` can 
invoke `chatModel.chat(...)` through `ctx.durableExecuteAsync(...)`, so metric 
updates can happen in the async callable.
   
   Expected behavior:
   
   - Lazy metric lookup/registration should be race-free.
   - Counters/meters returned by the runtime metric group should be safe for 
supported async execution paths, or the metric-group API should explicitly 
enforce/document mailbox-thread-only usage.
   - The context metric-group getter should not accidentally expose a 
non-thread-safe facade to async callables without a clear contract or 
protection.
   
   Possible fix directions:
   
   - Synchronize all access to the internal `HashMap` caches, or use 
`ConcurrentHashMap` with `computeIfAbsent`.
   - Use a thread-safe counter implementation, e.g. `ThreadSafeSimpleCounter`, 
when counters/meters may be updated from async threads.
   - Clarify the contract of `getMeter(String name, Counter counter)` when 
multiple callers request the same meter name with different counters.
   - Decide whether the context metric-group getter should return a thread-safe 
facade or should be constrained to mailbox-thread access.
   
   ### How to reproduce
   
   A race can be demonstrated with concurrent first access to the same metric 
name:
   
   ```java
   FlinkAgentsMetricGroupImpl group = new 
FlinkAgentsMetricGroupImpl(parentMetricGroup);
   
   // Run concurrently from two threads.
   Counter counter = group.getCounter("promptTokens");
   counter.inc();
   ```
   
   Possible interleaving:
   
   1. Thread A calls `getCounter("promptTokens")` and sees the cache is empty.
   2. Thread B calls `getCounter("promptTokens")` and also sees the cache is 
empty.
   3. Thread A calls `super.counter("promptTokens")`; Flink registers counter A.
   4. Thread B calls `super.counter("promptTokens")`; Flink detects a name 
collision and does not report counter B, but returns counter B.
   5. Thread B stores counter B in the local cache.
   6. Later calls return counter B, so increments can go to an unreported 
counter.
   
   A lost-update case can also happen if multiple async paths share the same 
default `SimpleCounter`:
   
   ```text
   count starts at 0
   Thread A reads 0
   Thread B reads 0
   Thread A writes 1
   Thread B writes 1
   ```
   
   After two increments, the visible count can be `1` instead of `2`.
   
   The async API exposure can be exercised by using 
`ctx.getActionMetricGroup()` / `ctx.action_metric_group` inside a durable async 
callable, or by using Java chat models with async chat execution enabled, where 
token metrics are recorded inside the chat call path.
   
   ### Version and environment
   
   Observed in the current repository code on a `main`-line checkout, local 
commit `58a18b5bb84b476826f8c03d16a115537b92b0cb`.
   
   Relevant files:
   
   - 
`runtime/src/main/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImpl.java`
   - 
`api/src/main/java/org/apache/flink/agents/api/chat/model/BaseChatModelConnection.java`
   - 
`plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java`
   - `python/flink_agents/runtime/flink_metric_group.py`
   - `python/flink_agents/runtime/flink_runner_context.py`
   
   Flink version in the root `pom.xml`: `2.2.0`.
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to