joeyutong opened a new issue, #706: URL: https://github.com/apache/flink-agents/issues/706
### Search before asking - [x] I searched in the [issues](https://github.com/apache/flink-agents/issues) and found nothing similar. ### Description `FlinkAgentsMetricGroupImpl` has two related thread-safety concerns around counters/meters and metric-group access from async execution paths. 1. Java-side metric lookup/registration is not thread-safe. `FlinkAgentsMetricGroupImpl` lazily caches subgroups, counters, meters, histograms, and gauges in plain `HashMap`s. Accessors such as `getSubGroup`, `getCounter`, and `getMeter` use a check-then-put pattern without synchronization. This can race on first access to the same metric name. For example, two threads can both observe a missing counter and both call `super.counter(name)`. Flink's underlying metric group will keep the first registered metric and reject/report a name collision for the second one, but `counter(name, counter)` still returns the provided counter. If the second counter is stored in the `FlinkAgentsMetricGroupImpl` cache, later increments may go to a counter that is not reported. There is also a separate update-safety issue: `super.counter(name)` creates Flink's default `SimpleCounter`, which is not thread-safe, and `new MeterView(60)` internally uses a `SimpleCounter`. Concurrent `inc()` / `markEvent()` calls can lose updates. 2. The context metric-group getter exposes the metric group to async code. The metric group is exposed through `RunnerContext#getAgentMetricGroup()` / `getActionMetricGroup()` and Python `ctx.agent_metric_group` / `ctx.action_metric_group`. Some built-in paths record metrics after async calls resume, but the public context getter allows user code or resources to capture the metric group and use it inside `durableExecuteAsync` / `durable_execute_async` callables. On the Java built-in chat path, token metrics are recorded inside `BaseChatModelConnection.recordTokenMetrics(...)`, and `ChatModelAction` can invoke `chatModel.chat(...)` through `ctx.durableExecuteAsync(...)`, so metric updates can happen in the async callable. Expected behavior: - Lazy metric lookup/registration should be race-free. - Counters/meters returned by the runtime metric group should be safe for supported async execution paths, or the metric-group API should explicitly enforce/document mailbox-thread-only usage. - The context metric-group getter should not accidentally expose a non-thread-safe facade to async callables without a clear contract or protection. Possible fix directions: - Synchronize all access to the internal `HashMap` caches, or use `ConcurrentHashMap` with `computeIfAbsent`. - Use a thread-safe counter implementation, e.g. `ThreadSafeSimpleCounter`, when counters/meters may be updated from async threads. - Clarify the contract of `getMeter(String name, Counter counter)` when multiple callers request the same meter name with different counters. - Decide whether the context metric-group getter should return a thread-safe facade or should be constrained to mailbox-thread access. ### How to reproduce A race can be demonstrated with concurrent first access to the same metric name: ```java FlinkAgentsMetricGroupImpl group = new FlinkAgentsMetricGroupImpl(parentMetricGroup); // Run concurrently from two threads. Counter counter = group.getCounter("promptTokens"); counter.inc(); ``` Possible interleaving: 1. Thread A calls `getCounter("promptTokens")` and sees the cache is empty. 2. Thread B calls `getCounter("promptTokens")` and also sees the cache is empty. 3. Thread A calls `super.counter("promptTokens")`; Flink registers counter A. 4. Thread B calls `super.counter("promptTokens")`; Flink detects a name collision and does not report counter B, but returns counter B. 5. Thread B stores counter B in the local cache. 6. Later calls return counter B, so increments can go to an unreported counter. A lost-update case can also happen if multiple async paths share the same default `SimpleCounter`: ```text count starts at 0 Thread A reads 0 Thread B reads 0 Thread A writes 1 Thread B writes 1 ``` After two increments, the visible count can be `1` instead of `2`. The async API exposure can be exercised by using `ctx.getActionMetricGroup()` / `ctx.action_metric_group` inside a durable async callable, or by using Java chat models with async chat execution enabled, where token metrics are recorded inside the chat call path. ### Version and environment Observed in the current repository code on a `main`-line checkout, local commit `58a18b5bb84b476826f8c03d16a115537b92b0cb`. Relevant files: - `runtime/src/main/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImpl.java` - `api/src/main/java/org/apache/flink/agents/api/chat/model/BaseChatModelConnection.java` - `plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java` - `python/flink_agents/runtime/flink_metric_group.py` - `python/flink_agents/runtime/flink_runner_context.py` Flink version in the root `pom.xml`: `2.2.0`. ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
