This is an automated email from the ASF dual-hosted git repository.

yunfengzhou-hub pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new e94da402 [metric] Add key-value metric group for model and action 
dimensions (#760)
e94da402 is described below

commit e94da402cc106c5a1f7593d805cff5efb6ef6888
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Fri Jun 5 10:41:28 2026 +0800

    [metric] Add key-value metric group for model and action dimensions (#760)
---
 .../agents/api/chat/model/BaseChatModelSetup.java  |   2 +-
 .../agents/api/metrics/FlinkAgentsMetricGroup.java |   9 +
 .../model/BaseChatModelSetupTokenMetricsTest.java  | 109 +++++++++-
 docs/content/docs/operations/monitoring.md         |  14 +-
 .../pom.xml                                        |  19 ++
 .../integration/test/TokenMetricsE2EAgent.java     |  76 +++++++
 .../integration/test/TokenMetricsE2ETest.java      | 230 +++++++++++++++++++++
 .../flink/agents/plan/actions/ChatModelAction.java |   2 +-
 .../plan/actions/ChatModelActionRetryTest.java     |   7 +-
 python/flink_agents/api/chat_models/chat_model.py  |   2 +-
 .../api/chat_models/tests/test_token_metrics.py    |  17 +-
 python/flink_agents/api/metric_group.py            |  12 +-
 .../flink_agents/plan/actions/chat_model_action.py |   2 +-
 .../tests/actions/test_chat_model_action_retry.py  |  11 +-
 python/flink_agents/runtime/flink_metric_group.py  |  10 +-
 .../runtime/memory/mem0/mem0_long_term_memory.py   |   2 +-
 .../mem0/tests/test_mem0_long_term_memory.py       |  13 +-
 .../agents/runtime/context/RunnerContextImpl.java  |   2 +-
 .../agents/runtime/metrics/BuiltInMetrics.java     |   2 +-
 .../metrics/FlinkAgentsMetricGroupImpl.java        |  15 ++
 .../metrics/FlinkAgentsMetricGroupImplTest.java    |  38 ++++
 21 files changed, 549 insertions(+), 45 deletions(-)

diff --git 
a/api/src/main/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetup.java
 
b/api/src/main/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetup.java
index 3a9c7b2d..34b2b299 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetup.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetup.java
@@ -120,7 +120,7 @@ public abstract class BaseChatModelSetup extends Resource {
         if (metricGroup == null) {
             return;
         }
-        FlinkAgentsMetricGroup modelGroup = metricGroup.getSubGroup(modelName);
+        FlinkAgentsMetricGroup modelGroup = metricGroup.getSubGroup("model", 
modelName);
         modelGroup.getCounter("promptTokens").inc(promptTokens);
         modelGroup.getCounter("completionTokens").inc(completionTokens);
     }
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/metrics/FlinkAgentsMetricGroup.java
 
b/api/src/main/java/org/apache/flink/agents/api/metrics/FlinkAgentsMetricGroup.java
index 9ab3df3d..fa194c91 100644
--- 
a/api/src/main/java/org/apache/flink/agents/api/metrics/FlinkAgentsMetricGroup.java
+++ 
b/api/src/main/java/org/apache/flink/agents/api/metrics/FlinkAgentsMetricGroup.java
@@ -37,6 +37,15 @@ public interface FlinkAgentsMetricGroup {
      */
     FlinkAgentsMetricGroup getSubGroup(String name);
 
+    /**
+     * Create or retrieve a key-value sub-metric group.
+     *
+     * @param key The key of the metric group variable.
+     * @param value The value of the metric group variable.
+     * @return the sub-metric group instance.
+     */
+    FlinkAgentsMetricGroup getSubGroup(String key, String value);
+
     /**
      * Create or retrieve a gauge with the given name.
      *
diff --git 
a/api/src/test/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetupTokenMetricsTest.java
 
b/api/src/test/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetupTokenMetricsTest.java
index cde9f683..8e47105f 100644
--- 
a/api/src/test/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetupTokenMetricsTest.java
+++ 
b/api/src/test/java/org/apache/flink/agents/api/chat/model/BaseChatModelSetupTokenMetricsTest.java
@@ -19,15 +19,20 @@
 package org.apache.flink.agents.api.chat.model;
 
 import org.apache.flink.agents.api.metrics.FlinkAgentsMetricGroup;
+import org.apache.flink.agents.api.metrics.UpdatableGauge;
 import org.apache.flink.agents.api.resource.ResourceContext;
 import org.apache.flink.agents.api.resource.ResourceDescriptor;
 import org.apache.flink.agents.api.resource.ResourceType;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.SimpleCounter;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -72,7 +77,7 @@ class BaseChatModelSetupTokenMetricsTest {
         mockPromptTokensCounter = mock(Counter.class);
         mockCompletionTokensCounter = mock(Counter.class);
 
-        when(mockMetricGroup.getSubGroup("gpt-4")).thenReturn(mockModelGroup);
+        when(mockMetricGroup.getSubGroup("model", 
"gpt-4")).thenReturn(mockModelGroup);
         
when(mockModelGroup.getCounter("promptTokens")).thenReturn(mockPromptTokensCounter);
         
when(mockModelGroup.getCounter("completionTokens")).thenReturn(mockCompletionTokensCounter);
     }
@@ -84,7 +89,7 @@ class BaseChatModelSetupTokenMetricsTest {
 
         setup.recordTokenMetrics("gpt-4", 100, 50);
 
-        verify(mockMetricGroup).getSubGroup("gpt-4");
+        verify(mockMetricGroup).getSubGroup("model", "gpt-4");
         verify(mockModelGroup).getCounter("promptTokens");
         verify(mockModelGroup).getCounter("completionTokens");
         verify(mockPromptTokensCounter).inc(100);
@@ -108,15 +113,15 @@ class BaseChatModelSetupTokenMetricsTest {
         Counter mockGpt35PromptCounter = mock(Counter.class);
         Counter mockGpt35CompletionCounter = mock(Counter.class);
 
-        
when(mockMetricGroup.getSubGroup("gpt-3.5-turbo")).thenReturn(mockGpt35Group);
+        when(mockMetricGroup.getSubGroup("model", 
"gpt-3.5-turbo")).thenReturn(mockGpt35Group);
         
when(mockGpt35Group.getCounter("promptTokens")).thenReturn(mockGpt35PromptCounter);
         
when(mockGpt35Group.getCounter("completionTokens")).thenReturn(mockGpt35CompletionCounter);
 
         setup.recordTokenMetrics("gpt-4", 100, 50);
         setup.recordTokenMetrics("gpt-3.5-turbo", 200, 100);
 
-        verify(mockMetricGroup).getSubGroup("gpt-4");
-        verify(mockMetricGroup).getSubGroup("gpt-3.5-turbo");
+        verify(mockMetricGroup).getSubGroup("model", "gpt-4");
+        verify(mockMetricGroup).getSubGroup("model", "gpt-3.5-turbo");
         verify(mockPromptTokensCounter).inc(100);
         verify(mockCompletionTokensCounter).inc(50);
         verify(mockGpt35PromptCounter).inc(200);
@@ -128,4 +133,98 @@ class BaseChatModelSetupTokenMetricsTest {
     void testResourceType() {
         assertEquals(ResourceType.CHAT_MODEL, setup.getResourceType());
     }
+
+    // ========================================================================
+    // Value-based tests using TestMetricGroup (non-Mockito)
+    // ========================================================================
+
+    private static class TestMetricGroup implements FlinkAgentsMetricGroup {
+        final Map<String, TestMetricGroup> subGroups = new HashMap<>();
+        final Map<String, SimpleCounter> counters = new HashMap<>();
+
+        @Override
+        public FlinkAgentsMetricGroup getSubGroup(String name) {
+            return subGroups.computeIfAbsent(name, k -> new TestMetricGroup());
+        }
+
+        @Override
+        public FlinkAgentsMetricGroup getSubGroup(String key, String value) {
+            return subGroups.computeIfAbsent(key + "=" + value, k -> new 
TestMetricGroup());
+        }
+
+        @Override
+        public Counter getCounter(String name) {
+            return counters.computeIfAbsent(name, k -> new SimpleCounter());
+        }
+
+        @Override
+        public UpdatableGauge getGauge(String name) {
+            return null;
+        }
+
+        @Override
+        public Meter getMeter(String name) {
+            return null;
+        }
+
+        @Override
+        public Meter getMeter(String name, Counter counter) {
+            return null;
+        }
+
+        @Override
+        public Histogram getHistogram(String name) {
+            return null;
+        }
+
+        @Override
+        public Histogram getHistogram(String name, int windowSize) {
+            return null;
+        }
+    }
+
+    @Test
+    @DisplayName("Value-based: token counters are accessible under model 
key-value group")
+    void testTokenMetricsUnderModelKeyValueGroup() {
+        TestMetricGroup root = new TestMetricGroup();
+        setup.setMetricGroup(root);
+
+        setup.recordTokenMetrics("gpt-4", 100, 50);
+
+        TestMetricGroup modelGroup = (TestMetricGroup) 
root.getSubGroup("model", "gpt-4");
+        assertEquals(100, modelGroup.counters.get("promptTokens").getCount());
+        assertEquals(50, 
modelGroup.counters.get("completionTokens").getCount());
+    }
+
+    @Test
+    @DisplayName("Value-based: different models have independent counters")
+    void testDifferentModelsHaveIndependentCounters() {
+        TestMetricGroup root = new TestMetricGroup();
+        setup.setMetricGroup(root);
+
+        setup.recordTokenMetrics("gpt-4", 100, 50);
+        setup.recordTokenMetrics("gpt-3.5-turbo", 200, 80);
+
+        TestMetricGroup gpt4 = (TestMetricGroup) root.getSubGroup("model", 
"gpt-4");
+        TestMetricGroup gpt35 = (TestMetricGroup) root.getSubGroup("model", 
"gpt-3.5-turbo");
+
+        assertEquals(100, gpt4.counters.get("promptTokens").getCount());
+        assertEquals(50, gpt4.counters.get("completionTokens").getCount());
+        assertEquals(200, gpt35.counters.get("promptTokens").getCount());
+        assertEquals(80, gpt35.counters.get("completionTokens").getCount());
+    }
+
+    @Test
+    @DisplayName("Value-based: counters accumulate across multiple calls")
+    void testCountersAccumulate() {
+        TestMetricGroup root = new TestMetricGroup();
+        setup.setMetricGroup(root);
+
+        setup.recordTokenMetrics("gpt-4", 100, 50);
+        setup.recordTokenMetrics("gpt-4", 150, 75);
+
+        TestMetricGroup modelGroup = (TestMetricGroup) 
root.getSubGroup("model", "gpt-4");
+        assertEquals(250, modelGroup.counters.get("promptTokens").getCount());
+        assertEquals(125, 
modelGroup.counters.get("completionTokens").getCount());
+    }
 }
diff --git a/docs/content/docs/operations/monitoring.md 
b/docs/content/docs/operations/monitoring.md
index 1925c7f6..68012f01 100644
--- a/docs/content/docs/operations/monitoring.md
+++ b/docs/content/docs/operations/monitoring.md
@@ -36,18 +36,18 @@ We offer data monitoring for built-in metrics, which 
includes events, actions, a
 | **Agent** | numOfEventProcessedPerSec                        | The number of 
Events this operator has processed per second.                     | Meter |
 | **Agent** | numOfActionsExecuted                             | The total 
number of actions this operator has executed.                          | Count |
 | **Agent** | numOfActionsExecutedPerSec                       | The number of 
actions this operator has executed per second.                     | Meter |
-| **Action**  | <action_name>.numOfActionsExecuted | The total number of 
actions this operator has executed for a specific action name. | Count |
-| **Action**  | <action_name>.numOfActionsExecutedPerSec | The number of 
actions this operator has executed per second for a specific action name. | 
Meter |
+| **Action**  | action.\<action_name\>.numOfActionsExecuted | The total number 
of actions this operator has executed for a specific action name. | Count |
+| **Action**  | action.\<action_name\>.numOfActionsExecutedPerSec | The number 
of actions this operator has executed per second for a specific action name. | 
Meter |
 | **Agent**   | eventLogTruncatedEvents                          | Number of 
event log records whose payload was truncated at `STANDARD` level. Increments 
once per event, regardless of how many fields inside it were truncated. Use 
this to decide whether to raise truncation thresholds or move specific event 
types to `VERBOSE`. | Count |
 
 #### Token Usage Metrics
 
 Token usage metrics are automatically recorded when chat models are invoked 
through `ChatModelConnection`. These metrics help track LLM API usage and costs.
 
-| Scope     | Metrics                                     | Description        
                                                            | Type  |
-|-----------|---------------------------------------------|--------------------------------------------------------------------------------|-------|
-| **Model** | <action_name>.<model_name>.promptTokens     | The total number 
of prompt tokens consumed by the model within an action.      | Count |
-| **Model** | <action_name>.<model_name>.completionTokens | The total number 
of completion tokens generated by the model within an action. | Count |
+| Scope     | Metrics                                                      | 
Description                                                                    
| Type  |
+|-----------|--------------------------------------------------------------|--------------------------------------------------------------------------------|-------|
+| **Model** | action.\<action_name\>.model.\<model_name\>.promptTokens     | 
The total number of prompt tokens consumed by the model within an action.      
| Count |
+| **Model** | action.\<action_name\>.model.\<model_name\>.completionTokens | 
The total number of completion tokens generated by the model within an action. 
| Count |
 
 ### How to add custom metrics
 
@@ -113,7 +113,7 @@ public class MyAgent extends Agent {
 
 ### How to check the metrics with Flink executor
 
-Flink agents enable the reporting of metrics to external systems by creating a 
metric identifier prefix in the format 
`<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>`. Please 
refer to [Flink Metric 
Reporters](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/metric_reporters/)
 for more details.
+Flink agents enable the reporting of metrics to external systems by creating a 
metric identifier prefix in the format 
`<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>`. 
Agent-specific metrics use key-value metric groups (e.g., 
`action.<action_name>`, `model.<model_name>`) which are exposed as 
dimensions/labels in reporters that support them (such as Prometheus). Please 
refer to [Flink Metric 
Reporters](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/ 
[...]
 
 Additionally, we can check the metric results in the Flink Job WebUI using the 
metric identifier prefix `<subtask_index>.<operator_name>`.
 
diff --git a/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml 
b/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
index 44d76579..119edaf8 100644
--- a/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
+++ b/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
@@ -109,6 +109,25 @@ under the License.
             <artifactId>jackson-datatype-jsr310</artifactId>
             <version>${jackson.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime</artifactId>
+            <version>${flink.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <version>4.12.0</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <profiles>
diff --git 
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/TokenMetricsE2EAgent.java
 
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/TokenMetricsE2EAgent.java
new file mode 100644
index 00000000..9cefc284
--- /dev/null
+++ 
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/TokenMetricsE2EAgent.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integration.test;
+
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.OutputEvent;
+import org.apache.flink.agents.api.agents.Agent;
+import org.apache.flink.agents.api.annotation.Action;
+import org.apache.flink.agents.api.annotation.ChatModelConnection;
+import org.apache.flink.agents.api.annotation.ChatModelSetup;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.event.ChatRequestEvent;
+import org.apache.flink.agents.api.event.ChatResponseEvent;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceName;
+
+import java.util.Collections;
+
+/**
+ * Minimal agent for token metrics e2e testing. Uses OpenAI completions 
connection pointing to a
+ * MockWebServer endpoint. Set {@link #apiBaseUrl} before building the agent 
plan.
+ */
+public class TokenMetricsE2EAgent extends Agent {
+
+    static String apiBaseUrl;
+
+    @ChatModelConnection
+    public static ResourceDescriptor chatModelConnection() {
+        return ResourceDescriptor.Builder.newBuilder(
+                        ResourceName.ChatModel.OPENAI_COMPLETIONS_CONNECTION)
+                .addInitialArgument("api_key", "test-key")
+                .addInitialArgument("api_base_url", apiBaseUrl)
+                .build();
+    }
+
+    @ChatModelSetup
+    public static ResourceDescriptor chatModel() {
+        return ResourceDescriptor.Builder.newBuilder(
+                        ResourceName.ChatModel.OPENAI_COMPLETIONS_SETUP)
+                .addInitialArgument("connection", "chatModelConnection")
+                .addInitialArgument("model", "gpt-4o-mini")
+                .build();
+    }
+
+    @Action(listenEventTypes = {InputEvent.EVENT_TYPE})
+    public static void process(InputEvent event, RunnerContext ctx) throws 
Exception {
+        ctx.sendEvent(
+                new ChatRequestEvent(
+                        "chatModel",
+                        Collections.singletonList(
+                                new ChatMessage(MessageRole.USER, (String) 
event.getInput()))));
+    }
+
+    @Action(listenEventTypes = {ChatResponseEvent.EVENT_TYPE})
+    public static void processChatResponse(ChatResponseEvent event, 
RunnerContext ctx) {
+        ctx.sendEvent(new OutputEvent(event.getResponse().getContent()));
+    }
+}
diff --git 
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/TokenMetricsE2ETest.java
 
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/TokenMetricsE2ETest.java
new file mode 100644
index 00000000..5b0982b2
--- /dev/null
+++ 
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/TokenMetricsE2ETest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.integration.test;
+
+import okhttp3.mockwebserver.Dispatcher;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okhttp3.mockwebserver.RecordedRequest;
+import org.apache.flink.agents.api.AgentsExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * End-to-end test verifying that agent metrics with key-value metric group 
dimensions flow through
+ * the real Flink metric system. Uses {@link InMemoryReporter} to capture 
metrics and {@link
+ * MockWebServer} as a stand-in for the LLM endpoint.
+ *
+ * <p>The test validates the <b>complete</b> metric identifier. The 
TaskManager resource ID is
+ * extracted at runtime via {@link #PREFIX_PATTERN}, then each expected 
metric's full identifier is
+ * constructed from {@link #PREFIX_TEMPLATE} and matched exactly. To add 
coverage for a new metric,
+ * add an entry to {@link #EXPECTED_AGENT_COUNTERS}.
+ */
+class TokenMetricsE2ETest {
+
+    private static final String CANNED_RESPONSE =
+            
"{\"id\":\"chatcmpl-test\",\"object\":\"chat.completion\",\"created\":0,"
+                    + "\"model\":\"gpt-4o-mini\","
+                    + "\"choices\":[{\"index\":0,"
+                    + 
"\"message\":{\"role\":\"assistant\",\"content\":\"Hello!\"},"
+                    + "\"finish_reason\":\"stop\"}],"
+                    + 
"\"usage\":{\"prompt_tokens\":10,\"completion_tokens\":5,\"total_tokens\":15}}";
+
+    private static final Pattern PREFIX_PATTERN =
+            Pattern.compile(
+                    "^\\.taskmanager\\.([a-f0-9-]+)\\.Flink Streaming 
Job\\.action-execute-operator\\.0\\.");
+
+    private static final String PREFIX_TEMPLATE =
+            ".taskmanager.%s.Flink Streaming Job.action-execute-operator.0.";
+
+    /**
+     * Expected agent Counter metrics. Each key is the deterministic suffix 
after the prefix in the
+     * full metric identifier. The value is the expected counter value after 
processing 2 input
+     * records.
+     *
+     * <p>To extend: add one entry per new counter metric.
+     */
+    private static final Map<String, Long> EXPECTED_AGENT_COUNTERS = 
buildExpectedCounters();
+
+    private static Map<String, Long> buildExpectedCounters() {
+        Map<String, Long> m = new LinkedHashMap<>();
+        // Token metrics — validates action.<name>.model.<model>.<counter> 
hierarchy
+        // 2 chat requests x 10 prompt tokens = 20
+        m.put("action.chat_model_action.model.gpt-4o-mini.promptTokens", 20L);
+        // 2 chat requests x 5 completion tokens = 10
+        m.put("action.chat_model_action.model.gpt-4o-mini.completionTokens", 
10L);
+        return m;
+    }
+
+    private static final InMemoryReporter REPORTER = 
InMemoryReporter.createWithRetainedMetrics();
+
+    @RegisterExtension
+    static final MiniClusterExtension MINI_CLUSTER =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(createClusterConfig())
+                            .setNumberSlotsPerTaskManager(2)
+                            .setNumberTaskManagers(1)
+                            .build());
+
+    private static Configuration createClusterConfig() {
+        Configuration config = new Configuration();
+        REPORTER.addToConfiguration(config);
+        return config;
+    }
+
+    private MockWebServer mockServer;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        mockServer = new MockWebServer();
+        mockServer.setDispatcher(
+                new Dispatcher() {
+                    @Override
+                    public MockResponse dispatch(RecordedRequest request) {
+                        return new MockResponse()
+                                .setHeader("Content-Type", "application/json")
+                                .setBody(CANNED_RESPONSE);
+                    }
+                });
+        mockServer.start();
+    }
+
+    @AfterEach
+    void tearDown() throws Exception {
+        mockServer.shutdown();
+    }
+
+    @Test
+    @DisplayName("Agent metrics are registered with correct key-value group 
hierarchy")
+    void testAgentMetricsWithFullIdentifierValidation() throws Exception {
+        TokenMetricsE2EAgent.apiBaseUrl = mockServer.url("/v1").toString();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        DataStream<String> inputStream = env.fromData("What is 1+1?", "Hello 
world");
+
+        AgentsExecutionEnvironment agentsEnv =
+                AgentsExecutionEnvironment.getExecutionEnvironment(env);
+
+        DataStream<Object> outputStream =
+                agentsEnv
+                        .fromDataStream(inputStream, (KeySelector<String, 
String>) value -> value)
+                        .apply(new TokenMetricsE2EAgent())
+                        .toDataStream();
+
+        CloseableIterator<Object> results = outputStream.collectAsync();
+        agentsEnv.execute();
+
+        List<Object> collected = new ArrayList<>();
+        while (results.hasNext()) {
+            collected.add(results.next());
+        }
+        assertThat(collected).hasSize(2);
+
+        Map<MetricGroup, Map<String, Metric>> byGroup = 
REPORTER.getMetricsByGroup();
+
+        String taskManagerId = extractTaskManagerId(byGroup);
+        assertThat(taskManagerId)
+                .as(
+                        "Should extract TaskManager resource ID from metrics "
+                                + "matching prefix pattern: %s",
+                        PREFIX_PATTERN.pattern())
+                .isNotNull();
+
+        String metricIdPrefix = String.format(PREFIX_TEMPLATE, taskManagerId);
+
+        for (Map.Entry<String, Long> expected : 
EXPECTED_AGENT_COUNTERS.entrySet()) {
+            String expectedSuffix = expected.getKey();
+            long expectedValue = expected.getValue();
+            String expectedFullId = metricIdPrefix + expectedSuffix;
+
+            List<Counter> counters = findAllCountersByExactId(byGroup, 
expectedFullId);
+
+            assertThat(counters)
+                    .as("Exactly one counter with metric id '%s'", 
expectedFullId)
+                    .hasSize(1);
+
+            assertThat(counters.get(0).getCount())
+                    .as("Counter value for '%s'", expectedFullId)
+                    .isEqualTo(expectedValue);
+        }
+    }
+
+    /**
+     * Extracts the TaskManager resource ID by matching any operator metric 
against {@link
+     * #PREFIX_PATTERN}. Returns {@code null} if no metric matches the 
expected prefix structure,
+     * which would indicate the framework changed its metric hierarchy.
+     */
+    private static String extractTaskManagerId(Map<MetricGroup, Map<String, 
Metric>> byGroup) {
+        for (Map.Entry<MetricGroup, Map<String, Metric>> groupEntry : 
byGroup.entrySet()) {
+            MetricGroup group = groupEntry.getKey();
+            for (String metricName : groupEntry.getValue().keySet()) {
+                String fullId = group.getMetricIdentifier(metricName);
+                Matcher matcher = PREFIX_PATTERN.matcher(fullId);
+                if (matcher.find()) {
+                    return matcher.group(1);
+                }
+            }
+        }
+        return null;
+    }
+
+    private static List<Counter> findAllCountersByExactId(
+            Map<MetricGroup, Map<String, Metric>> byGroup, String expectedId) {
+        List<Counter> result = new ArrayList<>();
+        for (Map.Entry<MetricGroup, Map<String, Metric>> groupEntry : 
byGroup.entrySet()) {
+            MetricGroup group = groupEntry.getKey();
+            for (Map.Entry<String, Metric> metricEntry : 
groupEntry.getValue().entrySet()) {
+                if (!(metricEntry.getValue() instanceof Counter)) {
+                    continue;
+                }
+                String fullId = 
group.getMetricIdentifier(metricEntry.getKey());
+                if (expectedId.equals(fullId)) {
+                    result.add((Counter) metricEntry.getValue());
+                }
+            }
+        }
+        return result;
+    }
+}
diff --git 
a/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java 
b/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java
index 504b4fb9..df28c1d4 100644
--- 
a/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java
+++ 
b/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java
@@ -176,7 +176,7 @@ public class ChatModelAction {
         }
         FlinkAgentsMetricGroup metricGroup = ctx.getActionMetricGroup();
         if (metricGroup != null) {
-            FlinkAgentsMetricGroup modelGroup = metricGroup.getSubGroup(model);
+            FlinkAgentsMetricGroup modelGroup = 
metricGroup.getSubGroup("model", model);
             modelGroup.getCounter("retryCount").inc(retryCount);
             modelGroup.getCounter("retryWaitSec").inc(totalRetryWaitSec);
         }
diff --git 
a/plan/src/test/java/org/apache/flink/agents/plan/actions/ChatModelActionRetryTest.java
 
b/plan/src/test/java/org/apache/flink/agents/plan/actions/ChatModelActionRetryTest.java
index a66e36f9..8c139505 100644
--- 
a/plan/src/test/java/org/apache/flink/agents/plan/actions/ChatModelActionRetryTest.java
+++ 
b/plan/src/test/java/org/apache/flink/agents/plan/actions/ChatModelActionRetryTest.java
@@ -90,7 +90,8 @@ class ChatModelActionRetryTest {
                 .thenAnswer(inv -> 
inv.<DurableCallable<ChatMessage>>getArgument(0).call());
 
         // Wire up metric group chain
-        
when(mockActionMetricGroup.getSubGroup(anyString())).thenReturn(mockModelMetricGroup);
+        when(mockActionMetricGroup.getSubGroup(anyString(), anyString()))
+                .thenReturn(mockModelMetricGroup);
         
when(mockModelMetricGroup.getCounter("retryCount")).thenReturn(mockRetryCountCounter);
         
when(mockModelMetricGroup.getCounter("retryWaitSec")).thenReturn(mockRetryWaitSecCounter);
     }
@@ -123,7 +124,7 @@ class ChatModelActionRetryTest {
         assertThat(responseEvent.getTotalRetryWaitSec()).isEqualTo(0);
 
         // No retry metrics should be recorded
-        verify(mockActionMetricGroup, never()).getSubGroup(anyString());
+        verify(mockActionMetricGroup, never()).getSubGroup(anyString(), 
anyString());
     }
 
     @Test
@@ -163,7 +164,7 @@ class ChatModelActionRetryTest {
         assertThat(elapsed).isGreaterThanOrEqualTo(1000L);
 
         // Verify metrics recorded under connection name
-        
verify(mockActionMetricGroup).getSubGroup(mockChatModel.getConnectionName());
+        verify(mockActionMetricGroup).getSubGroup("model", 
mockChatModel.getConnectionName());
         verify(mockRetryCountCounter).inc(1);
         verify(mockRetryWaitSecCounter).inc(1);
     }
diff --git a/python/flink_agents/api/chat_models/chat_model.py 
b/python/flink_agents/api/chat_models/chat_model.py
index 7cbe6e12..ac4a814a 100644
--- a/python/flink_agents/api/chat_models/chat_model.py
+++ b/python/flink_agents/api/chat_models/chat_model.py
@@ -278,7 +278,7 @@ class BaseChatModelSetup(Resource):
         if metric_group is None:
             return
 
-        model_group = metric_group.get_sub_group(model_name)
+        model_group = metric_group.get_sub_group("model", model_name)
         model_group.get_counter("promptTokens").inc(prompt_tokens)
         model_group.get_counter("completionTokens").inc(completion_tokens)
 
diff --git a/python/flink_agents/api/chat_models/tests/test_token_metrics.py 
b/python/flink_agents/api/chat_models/tests/test_token_metrics.py
index d987d121..e8195151 100644
--- a/python/flink_agents/api/chat_models/tests/test_token_metrics.py
+++ b/python/flink_agents/api/chat_models/tests/test_token_metrics.py
@@ -75,10 +75,11 @@ class _MockMetricGroup(MetricGroup):
         self._sub_groups: dict[str, _MockMetricGroup] = {}
         self._counters: dict[str, _MockCounter] = {}
 
-    def get_sub_group(self, name: str) -> "_MockMetricGroup":
-        if name not in self._sub_groups:
-            self._sub_groups[name] = _MockMetricGroup()
-        return self._sub_groups[name]
+    def get_sub_group(self, name: str, value: str | None = None) -> 
"_MockMetricGroup":
+        key = f"{name}={value}" if value is not None else name
+        if key not in self._sub_groups:
+            self._sub_groups[key] = _MockMetricGroup()
+        return self._sub_groups[key]
 
     def get_counter(self, name: str) -> _MockCounter:
         if name not in self._counters:
@@ -110,7 +111,7 @@ class TestBaseChatModelTokenMetrics:
         chat_model.test_record_token_metrics("gpt-4", 100, 50)
 
         # Verify the metrics were recorded
-        model_group = mock_metric_group.get_sub_group("gpt-4")
+        model_group = mock_metric_group.get_sub_group("model", "gpt-4")
         assert model_group.get_counter("promptTokens").get_count() == 100
         assert model_group.get_counter("completionTokens").get_count() == 50
 
@@ -138,8 +139,8 @@ class TestBaseChatModelTokenMetrics:
         chat_model.test_record_token_metrics("gpt-3.5-turbo", 200, 100)
 
         # Verify each model has its own counters
-        gpt4_group = mock_metric_group.get_sub_group("gpt-4")
-        gpt35_group = mock_metric_group.get_sub_group("gpt-3.5-turbo")
+        gpt4_group = mock_metric_group.get_sub_group("model", "gpt-4")
+        gpt35_group = mock_metric_group.get_sub_group("model", "gpt-3.5-turbo")
 
         assert gpt4_group.get_counter("promptTokens").get_count() == 100
         assert gpt4_group.get_counter("completionTokens").get_count() == 50
@@ -159,7 +160,7 @@ class TestBaseChatModelTokenMetrics:
         chat_model.test_record_token_metrics("gpt-4", 150, 75)
 
         # Verify the metrics accumulated
-        model_group = mock_metric_group.get_sub_group("gpt-4")
+        model_group = mock_metric_group.get_sub_group("model", "gpt-4")
         assert model_group.get_counter("promptTokens").get_count() == 250
         assert model_group.get_counter("completionTokens").get_count() == 125
 
diff --git a/python/flink_agents/api/metric_group.py 
b/python/flink_agents/api/metric_group.py
index 78bc629c..12525294 100644
--- a/python/flink_agents/api/metric_group.py
+++ b/python/flink_agents/api/metric_group.py
@@ -25,13 +25,19 @@ class MetricGroup(ABC):
     """
 
     @abstractmethod
-    def get_sub_group(self, name: str) -> "MetricGroup":
-        """Create or retrieve a sub-metric group with the given name.
+    def get_sub_group(self, name: str, value: str | None = None) -> 
"MetricGroup":
+        """Create or retrieve a sub-metric group.
+
+        When *value* is ``None`` a plain named sub-group is returned.
+        When *value* is given, *name* is treated as the key and a
+        key-value sub-group is returned.
 
         Parameters
         ----------
         name : str
-            The name of the sub metric group.
+            The name (or key) of the sub metric group.
+        value : str, optional
+            The value of the metric group variable.
         """
 
     @abstractmethod
diff --git a/python/flink_agents/plan/actions/chat_model_action.py 
b/python/flink_agents/plan/actions/chat_model_action.py
index bbb6866d..188eb9e7 100644
--- a/python/flink_agents/plan/actions/chat_model_action.py
+++ b/python/flink_agents/plan/actions/chat_model_action.py
@@ -162,7 +162,7 @@ def _record_retry_metrics(
         return
     metric_group = ctx.action_metric_group
     if metric_group is not None:
-        model_group = metric_group.get_sub_group(model)
+        model_group = metric_group.get_sub_group("model", model)
         model_group.get_counter("retryCount").inc(retry_count)
         model_group.get_counter("retryWaitSec").inc(total_retry_wait_sec)
 
diff --git 
a/python/flink_agents/plan/tests/actions/test_chat_model_action_retry.py 
b/python/flink_agents/plan/tests/actions/test_chat_model_action_retry.py
index 879a02e7..e1f04f62 100644
--- a/python/flink_agents/plan/tests/actions/test_chat_model_action_retry.py
+++ b/python/flink_agents/plan/tests/actions/test_chat_model_action_retry.py
@@ -66,10 +66,11 @@ class _MockMetricGroup(MetricGroup):
         self._sub_groups: dict[str, _MockMetricGroup] = {}
         self._counters: dict[str, _MockCounter] = {}
 
-    def get_sub_group(self, name: str) -> "_MockMetricGroup":
-        if name not in self._sub_groups:
-            self._sub_groups[name] = _MockMetricGroup()
-        return self._sub_groups[name]
+    def get_sub_group(self, name: str, value: str | None = None) -> 
"_MockMetricGroup":
+        key = f"{name}={value}" if value is not None else name
+        if key not in self._sub_groups:
+            self._sub_groups[key] = _MockMetricGroup()
+        return self._sub_groups[key]
 
     def get_counter(self, name: str) -> _MockCounter:
         if name not in self._counters:
@@ -218,7 +219,7 @@ class TestChatModelActionRetry:
         assert elapsed >= 1.0
 
         # Verify metrics recorded under connection name
-        model_group = metric_group.get_sub_group(chat_model.connection)
+        model_group = metric_group.get_sub_group("model", 
chat_model.connection)
         assert model_group.get_counter("retryCount").get_count() == 1
         assert model_group.get_counter("retryWaitSec").get_count() == 1
 
diff --git a/python/flink_agents/runtime/flink_metric_group.py 
b/python/flink_agents/runtime/flink_metric_group.py
index 59a2ae52..5c731e6a 100644
--- a/python/flink_agents/runtime/flink_metric_group.py
+++ b/python/flink_agents/runtime/flink_metric_group.py
@@ -36,7 +36,15 @@ class FlinkMetricGroup(MetricGroup):
         self._j_metric_group = j_metric_group
 
     @override
-    def get_sub_group(self, name: str) -> "FlinkMetricGroup":
+    def get_sub_group(self, name: str, value: str | None = None) -> 
"FlinkMetricGroup":
+        if value is not None:
+            return FlinkMetricGroup(self._j_metric_group.getSubGroup(name, 
value))
+        if "=" in name:
+            msg = (
+                f"Sub-group name must not contain '=' (got '{name}'). "
+                "Use get_sub_group(name, value) for key-value groups."
+            )
+            raise ValueError(msg)
         return FlinkMetricGroup(self._j_metric_group.getSubGroup(name))
 
     @override
diff --git a/python/flink_agents/runtime/memory/mem0/mem0_long_term_memory.py 
b/python/flink_agents/runtime/memory/mem0/mem0_long_term_memory.py
index b4085827..bcd81dfd 100644
--- a/python/flink_agents/runtime/memory/mem0/mem0_long_term_memory.py
+++ b/python/flink_agents/runtime/memory/mem0/mem0_long_term_memory.py
@@ -455,7 +455,7 @@ class Mem0LongTermMemory(InternalBaseLongTermMemory):
                 and metric.get("promptTokens")
                 and metric.get("completionTokens")
             ):
-                model_group = 
self.metric_group.get_sub_group(metric["model_name"])
+                model_group = self.metric_group.get_sub_group("model", 
metric["model_name"])
                 
model_group.get_counter("promptTokens").inc(metric["promptTokens"])
                 model_group.get_counter("completionTokens").inc(
                     metric["completionTokens"]
diff --git 
a/python/flink_agents/runtime/memory/mem0/tests/test_mem0_long_term_memory.py 
b/python/flink_agents/runtime/memory/mem0/tests/test_mem0_long_term_memory.py
index df04eb3c..9911bb44 100644
--- 
a/python/flink_agents/runtime/memory/mem0/tests/test_mem0_long_term_memory.py
+++ 
b/python/flink_agents/runtime/memory/mem0/tests/test_mem0_long_term_memory.py
@@ -400,10 +400,11 @@ class MockMetricGroup:
         self._sub_groups: dict[str, MockMetricGroup] = {}
         self._counters: dict[str, int] = {}
 
-    def get_sub_group(self, name: str) -> "MockMetricGroup":
-        if name not in self._sub_groups:
-            self._sub_groups[name] = MockMetricGroup()
-        return self._sub_groups[name]
+    def get_sub_group(self, name: str, value: str | None = None) -> 
"MockMetricGroup":
+        key = f"{name}={value}" if value is not None else name
+        if key not in self._sub_groups:
+            self._sub_groups[key] = MockMetricGroup()
+        return self._sub_groups[key]
 
     def get_counter(self, name: str) -> "MockCounter":
         return MockCounter(self, name)
@@ -468,7 +469,7 @@ def test_token_usage_reported_on_switch_context() -> None:
 
     # Metrics are still in the queue — not reported until switch_context.
     ltm_group = metric_group.get_sub_group("long-term-memory")
-    model_group = ltm_group.get_sub_group("mock-model")
+    model_group = ltm_group.get_sub_group("model", "mock-model")
     assert model_group._counters.get("promptTokens", 0) == 0
 
     # switch_context drains the queue on the mailbox thread.
@@ -530,7 +531,7 @@ def test_token_usage_flushed_on_close() -> None:
     ltm.close()
 
     ltm_group = metric_group.get_sub_group("long-term-memory")
-    model_group = ltm_group.get_sub_group("mock-model")
+    model_group = ltm_group.get_sub_group("model", "mock-model")
     # 2 adds x 2 LLM calls each x 10 prompt tokens = 40
     assert model_group._counters.get("promptTokens", 0) == 40
     # 2 adds x 2 LLM calls each x 5 completion tokens = 20
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
index b0603ecb..1395bdc5 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
@@ -143,7 +143,7 @@ public class RunnerContextImpl implements RunnerContext {
 
     @Override
     public FlinkAgentsMetricGroupImpl getActionMetricGroup() {
-        return agentMetricGroup.getSubGroup(actionName);
+        return agentMetricGroup.getSubGroup("action", actionName);
     }
 
     @Override
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
index 59e0daaa..6bbc03aa 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
@@ -56,7 +56,7 @@ public class BuiltInMetrics {
         for (String actionName : agentPlan.getActions().keySet()) {
             actionMetricGroups.put(
                     actionName,
-                    new 
BuiltInActionMetrics(parentMetricGroup.getSubGroup(actionName)));
+                    new 
BuiltInActionMetrics(parentMetricGroup.getSubGroup("action", actionName)));
         }
     }
 
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImpl.java
index 6b0a2365..8360a9f0 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImpl.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImpl.java
@@ -27,6 +27,7 @@ import org.apache.flink.metrics.MeterView;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
 import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+import org.apache.flink.util.Preconditions;
 
 import java.util.HashMap;
 
@@ -53,12 +54,26 @@ public class FlinkAgentsMetricGroupImpl extends 
ProxyMetricGroup<MetricGroup>
     }
 
     public FlinkAgentsMetricGroupImpl getSubGroup(String name) {
+        Preconditions.checkArgument(
+                !name.contains("="),
+                "Sub-group name must not contain '=' (got '%s'). "
+                        + "Use getSubGroup(key, value) for key-value groups.",
+                name);
         if (!subMetricGroups.containsKey(name)) {
             subMetricGroups.put(name, new 
FlinkAgentsMetricGroupImpl(super.addGroup(name)));
         }
         return subMetricGroups.get(name);
     }
 
+    public FlinkAgentsMetricGroupImpl getSubGroup(String key, String value) {
+        String cacheKey = key + "=" + value;
+        if (!subMetricGroups.containsKey(cacheKey)) {
+            subMetricGroups.put(
+                    cacheKey, new 
FlinkAgentsMetricGroupImpl(super.addGroup(key, value)));
+        }
+        return subMetricGroups.get(cacheKey);
+    }
+
     public UpdatableGaugeImpl getGauge(String name) {
         if (!gauges.containsKey(name)) {
             gauges.put(name, super.gauge(name, new UpdatableGaugeImpl()));
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImplTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImplTest.java
index d58f8cdf..86d60f01 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImplTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImplTest.java
@@ -51,6 +51,44 @@ public class FlinkAgentsMetricGroupImplTest {
         assertEquals(result, metricGroup.getSubGroup(name));
     }
 
+    @Test
+    void testGetSubGroupWithKeyValue() {
+        String key = "model";
+        String value = "gpt-4";
+        FlinkAgentsMetricGroupImpl result = metricGroup.getSubGroup(key, 
value);
+
+        assertNotNull(result);
+        assertEquals(result, metricGroup.getSubGroup(key, value));
+    }
+
+    @Test
+    void testKeyValueSubGroupIsolatedFromNamedSubGroup() {
+        FlinkAgentsMetricGroupImpl named = metricGroup.getSubGroup("model");
+        FlinkAgentsMetricGroupImpl kv = metricGroup.getSubGroup("model", 
"gpt-4");
+
+        assertNotSame(named, kv);
+
+        named.getCounter("c").inc(10);
+        kv.getCounter("c").inc(99);
+
+        assertEquals(10, named.getCounter("c").getCount());
+        assertEquals(99, kv.getCounter("c").getCount());
+    }
+
+    @Test
+    void testDifferentValuesCreateDistinctSubGroups() {
+        FlinkAgentsMetricGroupImpl gpt4 = metricGroup.getSubGroup("model", 
"gpt-4");
+        FlinkAgentsMetricGroupImpl gpt35 = metricGroup.getSubGroup("model", 
"gpt-3.5");
+
+        assertNotSame(gpt4, gpt35);
+
+        gpt4.getCounter("promptTokens").inc(100);
+        gpt35.getCounter("promptTokens").inc(200);
+
+        assertEquals(100, gpt4.getCounter("promptTokens").getCount());
+        assertEquals(200, gpt35.getCounter("promptTokens").getCount());
+    }
+
     @Test
     void testGetGauge() {
         String name = "testGauge";

Reply via email to