This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new b62ed11  [api][runtime] Introduce metric mechanism (#74)
b62ed11 is described below

commit b62ed1158c98cfcce528b8cc172e6896cd29fca1
Author: Eugene <105473769+greateugen...@users.noreply.github.com>
AuthorDate: Thu Jul 31 17:42:33 2025 +0800

    [api][runtime] Introduce metric mechanism (#74)
---
 api/pom.xml                                        |   9 ++
 .../flink/agents/api/context/RunnerContext.java    |  15 ++
 .../agents/api/metrics/FlinkAgentsMetricGroup.java |  92 ++++++++++++
 .../flink/agents/api/metrics/UpdatableGauge.java   |  37 +++++
 python/flink_agents/api/metric_group.py            | 139 ++++++++++++++++++
 python/flink_agents/api/runner_context.py          |  21 +++
 python/flink_agents/runtime/flink_metric_group.py  | 160 +++++++++++++++++++++
 .../flink_agents/runtime/flink_runner_context.py   |  23 +++
 python/flink_agents/runtime/local_runner.py        |  13 ++
 .../agents/runtime/context/RunnerContextImpl.java  |  24 +++-
 .../runtime/metrics/BuiltInActionMetrics.java      |  44 ++++++
 .../agents/runtime/metrics/BuiltInMetrics.java     |  72 ++++++++++
 .../metrics/FlinkAgentsMetricGroupImpl.java        | 104 ++++++++++++++
 .../agents/runtime/metrics/UpdatableGaugeImpl.java |  42 ++++++
 .../runtime/operator/ActionExecutionOperator.java  |  25 +++-
 .../python/context/PythonRunnerContextImpl.java    |   7 +-
 .../runtime/python/utils/PythonActionExecutor.java |  12 +-
 .../metrics/FlinkAgentsMetricGroupImplTest.java    | 110 ++++++++++++++
 .../runtime/metrics/UpdatableGaugeImplTest.java    | 145 +++++++++++++++++++
 19 files changed, 1083 insertions(+), 11 deletions(-)

diff --git a/api/pom.xml b/api/pom.xml
index d371967..512895a 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -28,4 +28,13 @@ under the License.
     <artifactId>flink-agents-api</artifactId>
     <name>Flink Agents : API</name>
 
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-metrics-core</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
 </project>
\ No newline at end of file
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/context/RunnerContext.java 
b/api/src/main/java/org/apache/flink/agents/api/context/RunnerContext.java
index f4bdb6b..ae7307b 100644
--- a/api/src/main/java/org/apache/flink/agents/api/context/RunnerContext.java
+++ b/api/src/main/java/org/apache/flink/agents/api/context/RunnerContext.java
@@ -18,6 +18,7 @@
 package org.apache.flink.agents.api.context;
 
 import org.apache.flink.agents.api.Event;
+import org.apache.flink.agents.api.metrics.FlinkAgentsMetricGroup;
 
 /**
  * A context object used during action execution. It is responsible for 
collecting output events
@@ -38,4 +39,18 @@ public interface RunnerContext {
      * @throws Exception if the underlying state backend cannot be accessed
      */
     MemoryObject getShortTermMemory() throws Exception;
+
+    /**
+     * Gets the metric group for Flink Agents.
+     *
+     * @return the metric group shared across all actions.
+     */
+    FlinkAgentsMetricGroup getAgentMetricGroup();
+
+    /**
+     * Gets the individual metric group dedicated for each action.
+     *
+     * @return the individual metric group specific to the current action.
+     */
+    FlinkAgentsMetricGroup getActionMetricGroup();
 }
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/metrics/FlinkAgentsMetricGroup.java
 
b/api/src/main/java/org/apache/flink/agents/api/metrics/FlinkAgentsMetricGroup.java
new file mode 100644
index 0000000..9ab3df3
--- /dev/null
+++ 
b/api/src/main/java/org/apache/flink/agents/api/metrics/FlinkAgentsMetricGroup.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.agents.api.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+
+/**
+ * Abstract base class providing metric group for action execution. This 
metric group offers access
+ * to various types of metrics.
+ */
+public interface FlinkAgentsMetricGroup {
+
+    /**
+     * Create or retrieve a sub-metric group with the given name.
+     *
+     * @param name The name of the sub metric group.
+     * @return the sub-metric group instance.
+     */
+    FlinkAgentsMetricGroup getSubGroup(String name);
+
+    /**
+     * Create or retrieve a gauge with the given name.
+     *
+     * <p>Note: We use StringGauge here to ensure consistency across Python 
and Java interactions,
+     * avoiding potential type conflicts by standardizing on String as the 
value type.
+     *
+     * @param name The name of the gauge.
+     * @return the updatable gauge instance.
+     */
+    UpdatableGauge getGauge(String name);
+
+    /**
+     * Create or retrieve a counter with the given name.
+     *
+     * @param name The name of the counter.
+     * @return the counter instance.
+     */
+    Counter getCounter(String name);
+
+    /**
+     * Create or retrieve a meter with the given name.
+     *
+     * @param name The name of the meter.
+     * @return the meter instance.
+     */
+    Meter getMeter(String name);
+
+    /**
+     * Create or retrieve a meter with the given name and associate it with 
the provided counter.
+     *
+     * @param name The name of the meter.
+     * @param counter The counter to associate with the meter.
+     * @return the meter instance.
+     */
+    Meter getMeter(String name, Counter counter);
+
+    /**
+     * Create or retrieve a histogram with the given name using the default 
window size.
+     *
+     * @param name The name of the histogram.
+     * @return the histogram instance.
+     */
+    Histogram getHistogram(String name);
+
+    /**
+     * Create or retrieve a histogram with the given name and specified window 
size.
+     *
+     * @param name The name of the histogram.
+     * @param windowSize The sliding window size for histogram statistics.
+     * @return the histogram instance.
+     */
+    Histogram getHistogram(String name, int windowSize);
+}
diff --git 
a/api/src/main/java/org/apache/flink/agents/api/metrics/UpdatableGauge.java 
b/api/src/main/java/org/apache/flink/agents/api/metrics/UpdatableGauge.java
new file mode 100644
index 0000000..43dd94b
--- /dev/null
+++ b/api/src/main/java/org/apache/flink/agents/api/metrics/UpdatableGauge.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.agents.api.metrics;
+
+import org.apache.flink.metrics.Gauge;
+
+/**
+ * UpdatableGauge interface defines a simple gauge that extends the Gauge 
interface. This interface
+ * provides a method to update the gauge value.
+ *
+ * @param <T> The type parameter of the gauge value
+ */
+public interface UpdatableGauge<T> extends Gauge<T> {
+    /**
+     * Updates the value of the UpdatableGauge.
+     *
+     * @param value The new value to be set.
+     */
+    void update(T value);
+}
diff --git a/python/flink_agents/api/metric_group.py 
b/python/flink_agents/api/metric_group.py
new file mode 100644
index 0000000..5f07aac
--- /dev/null
+++ b/python/flink_agents/api/metric_group.py
@@ -0,0 +1,139 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from abc import ABC, abstractmethod
+
+
+class MetricGroup(ABC):
+    """Abstract base class providing metric group for action execution.
+
+    This metric group offers access to metrics.
+    """
+
+    @abstractmethod
+    def get_sub_group(self, name: str) -> "MetricGroup":
+        """Create or retrieve a sub-metric group with the given name.
+
+        Parameters
+        ----------
+        name : str
+            The name of the sub metric group.
+        """
+
+    @abstractmethod
+    def get_counter(self, name: str) -> "Counter":
+        """Create or retrieve a counter with the given name.
+
+        Parameters
+        ----------
+        name : str
+            The name of the counter.
+        """
+
+    @abstractmethod
+    def get_meter(self, name: str) -> "Meter":
+        """Create or retrieve a meter with the given name.
+
+        Parameters
+        ----------
+        name : str
+            The name of the meter.
+        """
+
+    @abstractmethod
+    def get_gauge(self, name: str) -> "Gauge":
+        """Create or retrieve a gauge with the given name.
+
+        Parameters
+        ----------
+        name : str
+            The name of the gauge.
+        """
+
+    @abstractmethod
+    def get_histogram(self, name: str, window_size: int = 100) -> "Histogram":
+        """Create or retrieve a histogram with the given name and window size.
+
+        Parameters
+        ----------
+        name : str
+            The name of the histogram.
+        window_size : int, optional
+            The sliding window size for histogram statistics.
+        """
+
+
+class Counter(ABC):
+    """A Counter that measures a count."""
+
+    @abstractmethod
+    def inc(self, n: int = 1) -> None:
+        """Increment the current count by the given value."""
+
+    @abstractmethod
+    def dec(self, n: int = 1) -> None:
+        """Decrement the current count by the given value."""
+
+    @abstractmethod
+    def get_count(self) -> int:
+        """Return the current count."""
+
+
+class Meter(ABC):
+    """Metric for measuring throughput."""
+
+    @abstractmethod
+    def mark(self, n: int =1) -> None:
+        """Trigger the meter by the given value."""
+
+    @abstractmethod
+    def get_rate(self) -> float:
+        """Return the current event rate per second."""
+
+
+class Histogram(ABC):
+    """The histogram allows to record values and create histogram statistics
+    for the currently seen elements.
+    """
+
+    @abstractmethod
+    def update(self, value: int) -> None:
+        """Update the histogram with the given value."""
+
+    @abstractmethod
+    def get_mean(self) -> float:
+        """Return the average value."""
+
+    @abstractmethod
+    def get_max(self) -> int:
+        """Return the maximum recorded value."""
+
+    @abstractmethod
+    def get_min(self) -> int:
+        """Return the minimum recorded value."""
+
+
+class Gauge(ABC):
+    """A gauge metric that returns a value by invoking a function."""
+
+    @abstractmethod
+    def update(self, value: float) -> None:
+        """Update the current value of the gauge."""
+
+    @abstractmethod
+    def get_value(self) -> float:
+        """Return the current value of the gauge."""
diff --git a/python/flink_agents/api/runner_context.py 
b/python/flink_agents/api/runner_context.py
index a46da78..4042d90 100644
--- a/python/flink_agents/api/runner_context.py
+++ b/python/flink_agents/api/runner_context.py
@@ -19,6 +19,7 @@ from abc import ABC, abstractmethod
 
 from flink_agents.api.event import Event
 from flink_agents.api.memory_object import MemoryObject
+from flink_agents.api.metric_group import MetricGroup
 from flink_agents.api.resource import Resource, ResourceType
 
 
@@ -59,3 +60,23 @@ class RunnerContext(ABC):
         MemoryObject
           The root object of the short-term memory.
         """
+
+    @abstractmethod
+    def get_agent_metric_group(self) -> MetricGroup:
+        """Get the metric group for flink agents.
+
+        Returns:
+        -------
+        MetricGroup
+            The metric group shared across all actions.
+        """
+
+    @abstractmethod
+    def get_action_metric_group(self) -> MetricGroup:
+        """Get the individual metric group dedicated for each action.
+
+        Returns:
+        -------
+        MetricGroup
+            The individual metric group specific to the current action.
+        """
diff --git a/python/flink_agents/runtime/flink_metric_group.py 
b/python/flink_agents/runtime/flink_metric_group.py
new file mode 100644
index 0000000..3172d00
--- /dev/null
+++ b/python/flink_agents/runtime/flink_metric_group.py
@@ -0,0 +1,160 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from typing import Any
+
+from typing_extensions import override
+
+from flink_agents.api.metric_group import Counter, Gauge, Histogram, Meter, 
MetricGroup
+
+
+class FlinkMetricGroup(MetricGroup):
+    """Implementation of MetricGroup for flink execution environment."""
+    def __init__(self, j_metric_group: Any) -> None:
+        """Initialize a flink metric group with the given java metric group.
+
+        Parameters
+        ----------
+        j_metric_group : Any
+            Java metric group used to synchronize data with the Flink metric 
system.
+        """
+        self._j_metric_group = j_metric_group
+
+    @override
+    def get_sub_group(self, name: str) -> "FlinkMetricGroup":
+        return FlinkMetricGroup(self._j_metric_group.getSubGroup(name))
+
+    @override
+    def get_counter(self, name: str) -> "FlinkCounter":
+        return FlinkCounter(self._j_metric_group.getCounter(name))
+
+    @override
+    def get_meter(self, name: str) -> "FlinkMeter":
+        return FlinkMeter(self._j_metric_group.getMeter(name))
+
+    @override
+    def get_histogram(self, name: str, window_size: int =100) -> 
"FlinkHistogram":
+        return FlinkHistogram(self._j_metric_group.getHistogram(name, 
window_size))
+
+    @override
+    def get_gauge(self, name: str) -> "FlinkGauge":
+        return FlinkGauge(self._j_metric_group.getGauge(name))
+
+class FlinkCounter(Counter):
+    """Implementation of Counter for flink execution environment."""
+    def __init__(self, j_counter: Any) -> None:
+        """Initialize a flink runner context with the given java runner 
context.
+
+        Parameters
+        ----------
+        j_counter : Any
+            Java counter used for measuring the count of events.
+        """
+        self._j_counter = j_counter
+
+    @override
+    def inc(self, n: int = 1) -> None:
+        """Increment the current count by the given value."""
+        self._j_counter.inc(n)
+
+    @override
+    def dec(self, n: int = 1) -> None:
+        """Decrement the current count by the given value."""
+        self._j_counter.dec(n)
+
+    @override
+    def get_count(self) -> int:
+        """Return the current count."""
+        return self._j_counter.getCount()
+
+class FlinkMeter(Meter):
+    """Implementation of Meter for flink execution environment."""
+    def __init__(self, j_meter: Any) -> None:
+        """Initialize a flink meter with the given java meter.
+
+        Parameters
+        ----------
+        j_meter : Any
+            Java meter measures throughput.
+        """
+        self._j_meter = j_meter
+
+    @override
+    def mark(self, n: int = 1) -> None:
+        """Mark the occurrence of n events."""
+        self._j_meter.markEvent(n)
+
+    @override
+    def get_rate(self) -> float:
+        """Return the current event rate per second."""
+        return self._j_meter.getRate()
+
+class FlinkHistogram(Histogram):
+    """Implementation of Histogram for flink execution environment."""
+    def __init__(self, j_histogram: Any) -> None:
+        """Initialize a flink histogram with the given java histogram.
+
+        Parameters
+        ----------
+        j_histogram : Any
+            Java histogram used for recording values and computing statistical
+            summaries.
+        """
+        self._j_histogram = j_histogram
+        self._j_statistics = j_histogram.getStatistics()
+
+    @override
+    def update(self, value: int) -> None:
+        """Record a new value into the histogram."""
+        self._j_histogram.update(value)
+
+    @override
+    def get_mean(self) -> float:
+        """Return the average value."""
+        return self._j_statistics.getMean()
+
+    @override
+    def get_max(self) -> int:
+        """Return the maximum recorded value."""
+        return self._j_statistics.getMax()
+
+    @override
+    def get_min(self) -> int:
+        """Return the minimum recorded value."""
+        return self._j_statistics.getMin()
+
+class FlinkGauge(Gauge):
+    """Implementation of Gauge for flink execution environment."""
+    def __init__(self, j_gauge: Any) -> None:
+        """Initialize a flink gauge with the given java gauge.
+
+        Parameters
+        ----------
+        j_gauge : Any
+            Java gauge for recording a string value.
+        """
+        self._j_gauge = j_gauge
+
+    @override
+    def update(self, value: float) -> None:
+        """Update the gauge with the given value."""
+        self._j_gauge.update(value)
+
+    @override
+    def get_value(self) -> float:
+        """Return the current value of the gauge."""
+        return self._j_gauge.getValue()
diff --git a/python/flink_agents/runtime/flink_runner_context.py 
b/python/flink_agents/runtime/flink_runner_context.py
index ed2f0f6..316918f 100644
--- a/python/flink_agents/runtime/flink_runner_context.py
+++ b/python/flink_agents/runtime/flink_runner_context.py
@@ -25,6 +25,7 @@ from flink_agents.api.resource import Resource, ResourceType
 from flink_agents.api.runner_context import RunnerContext
 from flink_agents.plan.agent_plan import AgentPlan
 from flink_agents.runtime.flink_memory_object import FlinkMemoryObject
+from flink_agents.runtime.flink_metric_group import FlinkMetricGroup
 
 
 class FlinkRunnerContext(RunnerContext):
@@ -82,6 +83,28 @@ class FlinkRunnerContext(RunnerContext):
             err_msg = "Failed to get short-term memory of runner context"
             raise RuntimeError(err_msg) from e
 
+    @override
+    def get_agent_metric_group(self) -> FlinkMetricGroup:
+        """Get the metric group for flink agents.
+
+        Returns:
+        -------
+        FlinkMetricGroup
+            The metric group shared across all actions.
+        """
+        return FlinkMetricGroup(self._j_runner_context.getAgentMetricGroup())
+
+    @override
+    def get_action_metric_group(self) -> FlinkMetricGroup:
+        """Get the individual metric group dedicated for each action.
+
+        Returns:
+        -------
+        FlinkMetricGroup
+            The individual metric group specific to the current action.
+        """
+        return FlinkMetricGroup(self._j_runner_context.getActionMetricGroup())
+
 
 def create_flink_runner_context(j_runner_context: Any, agent_plan_json: str) 
-> FlinkRunnerContext:
     """Used to create a FlinkRunnerContext Python object in Pemja 
environment."""
diff --git a/python/flink_agents/runtime/local_runner.py 
b/python/flink_agents/runtime/local_runner.py
index d025386..6d251f8 100644
--- a/python/flink_agents/runtime/local_runner.py
+++ b/python/flink_agents/runtime/local_runner.py
@@ -25,6 +25,7 @@ from typing_extensions import override
 from flink_agents.api.agent import Agent
 from flink_agents.api.event import Event, InputEvent, OutputEvent
 from flink_agents.api.memory_object import MemoryObject
+from flink_agents.api.metric_group import MetricGroup
 from flink_agents.api.resource import Resource, ResourceType
 from flink_agents.api.runner_context import RunnerContext
 from flink_agents.plan.agent_plan import AgentPlan
@@ -115,6 +116,18 @@ class LocalRunnerContext(RunnerContext):
         """
         return self._short_term_memory
 
+    @override
+    def get_agent_metric_group(self) -> MetricGroup:
+        # TODO: Support metric mechanism for local agent execution.
+        err_msg = "Metric mechanism is not supported for local agent execution 
yet."
+        raise NotImplementedError(err_msg)
+
+    @override
+    def get_action_metric_group(self) -> MetricGroup:
+        # TODO: Support metric mechanism for local agent execution.
+        err_msg = "Metric mechanism is not supported for local agent execution 
yet."
+        raise NotImplementedError(err_msg)
+
 class LocalRunner(AgentRunner):
     """Agent runner implementation for local execution, which is
     convenient for debugging.
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
index 7f6ec22..75e2309 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
@@ -22,6 +22,7 @@ import org.apache.flink.agents.api.context.MemoryObject;
 import org.apache.flink.agents.api.context.RunnerContext;
 import org.apache.flink.agents.plan.utils.JsonUtils;
 import org.apache.flink.agents.runtime.memory.MemoryObjectImpl;
+import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl;
 import org.apache.flink.api.common.state.MapState;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.flink.util.Preconditions;
@@ -38,8 +39,29 @@ public class RunnerContextImpl implements RunnerContext {
     protected final List<Event> pendingEvents = new ArrayList<>();
     protected final MapState<String, MemoryObjectImpl.MemoryItem> store;
 
-    public RunnerContextImpl(MapState<String, MemoryObjectImpl.MemoryItem> 
store) {
+    protected final FlinkAgentsMetricGroupImpl agentMetricGroup;
+
+    protected String actionName;
+
+    public RunnerContextImpl(
+            MapState<String, MemoryObjectImpl.MemoryItem> store,
+            FlinkAgentsMetricGroupImpl agentMetricGroup) {
         this.store = store;
+        this.agentMetricGroup = agentMetricGroup;
+    }
+
+    public void setActionName(String actionName) {
+        this.actionName = actionName;
+    }
+
+    @Override
+    public FlinkAgentsMetricGroupImpl getAgentMetricGroup() {
+        return agentMetricGroup;
+    }
+
+    @Override
+    public FlinkAgentsMetricGroupImpl getActionMetricGroup() {
+        return agentMetricGroup.getSubGroup(actionName);
     }
 
     @Override
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInActionMetrics.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInActionMetrics.java
new file mode 100644
index 0000000..2a518ef
--- /dev/null
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInActionMetrics.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.agents.runtime.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+
+/**
+ * ActionMetricGroup class extends FlinkAgentsMetricGroupImpl and is used to 
monitor and measure the
+ * performance metrics of executing actions. It provides metrics for the total 
number of actions
+ * executed and the number of actions executed per second.
+ */
+public class BuiltInActionMetrics {
+
+    private final Meter numOfActionsExecutedPerSec;
+
+    public BuiltInActionMetrics(FlinkAgentsMetricGroupImpl parentMetricGroup) {
+        Counter numOfActionsExecuted = 
parentMetricGroup.getCounter("numOfActionsExecuted");
+        this.numOfActionsExecutedPerSec =
+                parentMetricGroup.getMeter("numOfActionsExecutedPerSec", 
numOfActionsExecuted);
+    }
+
+    /** Marks that an action has finished executing. */
+    public void markActionExecuted() {
+        numOfActionsExecutedPerSec.markEvent();
+    }
+}
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
new file mode 100644
index 0000000..0e96573
--- /dev/null
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/BuiltInMetrics.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.agents.runtime.metrics;
+
+import org.apache.flink.agents.plan.AgentPlan;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Meter;
+
+import java.util.HashMap;
+
+/**
+ * Represents a group of built-in metrics for monitoring the performance and 
behavior of a flink
+ * agent job. This class is responsible for collecting and managing various 
metrics such as the
+ * number of events processed, the number of actions being executed, and the 
number of actions
+ * executed per second.
+ */
+public class BuiltInMetrics {
+
+    private final Meter numOfEventProcessedPerSec;
+
+    private final Meter numOfActionsExecutedPerSec;
+
+    private final HashMap<String, BuiltInActionMetrics> actionMetricGroups;
+
+    public BuiltInMetrics(FlinkAgentsMetricGroupImpl parentMetricGroup, 
AgentPlan agentPlan) {
+        Counter numOfEventsProcessed = 
parentMetricGroup.getCounter("numOfEventProcessed");
+        this.numOfEventProcessedPerSec =
+                parentMetricGroup.getMeter("numOfEventProcessedPerSec", 
numOfEventsProcessed);
+
+        Counter numOfActionsExecuted = 
parentMetricGroup.getCounter("numOfActionsExecuted");
+        this.numOfActionsExecutedPerSec =
+                parentMetricGroup.getMeter("numOfActionsExecutedPerSec", 
numOfActionsExecuted);
+
+        this.actionMetricGroups = new HashMap<>();
+        for (String actionName : agentPlan.getActions().keySet()) {
+            actionMetricGroups.put(
+                    actionName,
+                    new 
BuiltInActionMetrics(parentMetricGroup.getSubGroup(actionName)));
+        }
+    }
+
+    /** Records the occurrence of an event, increasing the count of events 
processed per second. */
+    public void markEventProcessed() {
+        numOfEventProcessedPerSec.markEvent();
+    }
+
+    /**
+     * Marks that an action has finished executing. Decrements the executing 
actions counter and
+     * marks an event on the executed meter.
+     */
+    public void markActionExecuted(String actionName) {
+        numOfActionsExecutedPerSec.markEvent();
+        actionMetricGroups.get(actionName).markActionExecuted();
+    }
+}
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImpl.java
new file mode 100644
index 0000000..6b0a236
--- /dev/null
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImpl.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.agents.runtime.metrics;
+
+import org.apache.flink.agents.api.metrics.FlinkAgentsMetricGroup;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
+
+import java.util.HashMap;
+
+/**
+ * Implementation of the FlinkAgentsMetricGroup interface, providing access to 
various types of
+ * metrics. This class extends ProxyMetricGroup and manages metrics such as 
counters, gauges,
+ * meters, and histograms.
+ */
+public class FlinkAgentsMetricGroupImpl extends ProxyMetricGroup<MetricGroup>
+        implements FlinkAgentsMetricGroup {
+
+    private final HashMap<String, FlinkAgentsMetricGroupImpl> subMetricGroups 
= new HashMap<>();
+
+    private final HashMap<String, Counter> counters = new HashMap<>();
+
+    private final HashMap<String, Meter> meters = new HashMap<>();
+
+    private final HashMap<String, Histogram> histograms = new HashMap<>();
+
+    private final HashMap<String, UpdatableGaugeImpl> gauges = new HashMap<>();
+
+    public FlinkAgentsMetricGroupImpl(MetricGroup parentMetricGroup) {
+        super(parentMetricGroup);
+    }
+
+    public FlinkAgentsMetricGroupImpl getSubGroup(String name) {
+        if (!subMetricGroups.containsKey(name)) {
+            subMetricGroups.put(name, new 
FlinkAgentsMetricGroupImpl(super.addGroup(name)));
+        }
+        return subMetricGroups.get(name);
+    }
+
+    public UpdatableGaugeImpl getGauge(String name) {
+        if (!gauges.containsKey(name)) {
+            gauges.put(name, super.gauge(name, new UpdatableGaugeImpl()));
+        }
+        return gauges.get(name);
+    }
+
+    public Counter getCounter(String name) {
+        if (!counters.containsKey(name)) {
+            counters.put(name, super.counter(name));
+        }
+        return counters.get(name);
+    }
+
+    public Meter getMeter(String name) {
+        if (!meters.containsKey(name)) {
+            meters.put(name, super.meter(name, new MeterView(60)));
+        }
+        return meters.get(name);
+    }
+
+    public Meter getMeter(String name, Counter counter) {
+        if (!meters.containsKey(name)) {
+            meters.put(name, super.meter(name, new MeterView(counter)));
+        }
+        return meters.get(name);
+    }
+
+    public Histogram getHistogram(String name) {
+        if (!histograms.containsKey(name)) {
+            histograms.put(name, super.histogram(name, new 
DescriptiveStatisticsHistogram(100)));
+        }
+        return histograms.get(name);
+    }
+
+    public Histogram getHistogram(String name, int windowSize) {
+        if (!histograms.containsKey(name)) {
+            histograms.put(
+                    name, super.histogram(name, new 
DescriptiveStatisticsHistogram(windowSize)));
+        }
+        return histograms.get(name);
+    }
+}
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/UpdatableGaugeImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/UpdatableGaugeImpl.java
new file mode 100644
index 0000000..1fd7297
--- /dev/null
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/metrics/UpdatableGaugeImpl.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.agents.runtime.metrics;
+
+import org.apache.flink.agents.api.metrics.UpdatableGauge;
+
+/**
+ * Implementation of the UpdatableGauge interface, used to monitor and track 
values. This class
+ * provides methods to set and retrieve the monitored value.
+ *
+ * @param <T> The type of value being monitored
+ */
+public class UpdatableGaugeImpl<T> implements UpdatableGauge<T> {
+    private T value;
+
+    @Override
+    public void update(T value) {
+        this.value = value;
+    }
+
+    @Override
+    public T getValue() {
+        return value;
+    }
+}
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index a0d1dc6..cbc7e10 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -27,6 +27,8 @@ import org.apache.flink.agents.plan.PythonFunction;
 import org.apache.flink.agents.runtime.context.RunnerContextImpl;
 import org.apache.flink.agents.runtime.env.PythonEnvironmentManager;
 import org.apache.flink.agents.runtime.memory.MemoryObjectImpl;
+import org.apache.flink.agents.runtime.metrics.BuiltInMetrics;
+import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl;
 import org.apache.flink.agents.runtime.python.event.PythonEvent;
 import org.apache.flink.agents.runtime.python.utils.PythonActionExecutor;
 import org.apache.flink.agents.runtime.utils.EventUtil;
@@ -81,6 +83,10 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
     // PythonActionExecutor for Python actions
     private transient PythonActionExecutor pythonActionExecutor;
 
+    private transient FlinkAgentsMetricGroupImpl metricGroup;
+
+    private transient BuiltInMetrics builtInMetrics;
+
     public ActionExecutionOperator(
             AgentPlan agentPlan, Boolean inputIsJava, ProcessingTimeService 
processingTimeService) {
         this.agentPlan = agentPlan;
@@ -102,7 +108,11 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
                         TypeInformation.of(String.class),
                         TypeInformation.of(MemoryObjectImpl.MemoryItem.class));
         shortTermMemState = 
getRuntimeContext().getMapState(shortTermMemStateDescriptor);
-        runnerContext = new RunnerContextImpl(shortTermMemState);
+
+        metricGroup = new FlinkAgentsMetricGroupImpl(getMetricGroup());
+        builtInMetrics = new BuiltInMetrics(metricGroup, agentPlan);
+
+        runnerContext = new RunnerContextImpl(shortTermMemState, metricGroup);
 
         // init PythonActionExecutor
         initPythonActionExecutor();
@@ -121,6 +131,7 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         events.push(inputEvent);
         while (!events.isEmpty()) {
             Event event = events.pop();
+            builtInMetrics.markEventProcessed();
             List<Action> actions = getActionsTriggeredBy(event);
             if (actions != null && !actions.isEmpty()) {
                 for (Action action : actions) {
@@ -130,22 +141,28 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
                     // TODO: Implement asynchronous action execution.
 
                     // execute action and collect output events
-                    LOG.debug("Try execute action {} for event {}.", 
action.getName(), event);
+                    String actionName = action.getName();
+                    LOG.debug("Try execute action {} for event {}.", 
actionName, event);
                     List<Event> actionOutputEvents;
                     if (action.getExec() instanceof JavaFunction) {
+                        runnerContext.setActionName(actionName);
                         action.getExec().call(event, runnerContext);
                         actionOutputEvents = runnerContext.drainEvents();
                     } else if (action.getExec() instanceof PythonFunction) {
                         checkState(event instanceof PythonEvent);
                         actionOutputEvents =
                                 pythonActionExecutor.executePythonFunction(
-                                        (PythonFunction) action.getExec(), 
(PythonEvent) event);
+                                        (PythonFunction) action.getExec(),
+                                        (PythonEvent) event,
+                                        actionName);
                     } else {
                         throw new RuntimeException("Unsupported action type: " 
+ action.getClass());
                     }
+                    builtInMetrics.markActionExecuted(actionName);
 
                     for (Event actionOutputEvent : actionOutputEvents) {
                         if (EventUtil.isOutputEvent(actionOutputEvent)) {
+                            builtInMetrics.markEventProcessed();
                             OUT outputData = 
getOutputFromOutputEvent(actionOutputEvent);
                             LOG.debug(
                                     "Collect output data {} for input {} in 
action {}.",
@@ -190,7 +207,7 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
                     new PythonActionExecutor(
                             pythonEnvironmentManager,
                             new ObjectMapper().writeValueAsString(agentPlan));
-            pythonActionExecutor.open(shortTermMemState);
+            pythonActionExecutor.open(shortTermMemState, metricGroup);
         }
     }
 
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
index 455208f..d9a20fc 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
@@ -22,6 +22,7 @@ import org.apache.flink.agents.api.context.MemoryObject;
 import org.apache.flink.agents.api.context.RunnerContext;
 import org.apache.flink.agents.runtime.context.RunnerContextImpl;
 import org.apache.flink.agents.runtime.memory.MemoryObjectImpl;
+import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl;
 import org.apache.flink.agents.runtime.python.event.PythonEvent;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.util.Preconditions;
@@ -32,8 +33,10 @@ import javax.annotation.concurrent.NotThreadSafe;
 @NotThreadSafe
 public class PythonRunnerContextImpl extends RunnerContextImpl {
 
-    public PythonRunnerContextImpl(MapState<String, 
MemoryObjectImpl.MemoryItem> store) {
-        super(store);
+    public PythonRunnerContextImpl(
+            MapState<String, MemoryObjectImpl.MemoryItem> store,
+            FlinkAgentsMetricGroupImpl agentMetricGroup) {
+        super(store, agentMetricGroup);
     }
 
     @Override
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
index 1dcdf5b..905ddb6 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
@@ -22,6 +22,7 @@ import org.apache.flink.agents.plan.PythonFunction;
 import org.apache.flink.agents.runtime.env.EmbeddedPythonEnvironment;
 import org.apache.flink.agents.runtime.env.PythonEnvironmentManager;
 import org.apache.flink.agents.runtime.memory.MemoryObjectImpl;
+import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl;
 import org.apache.flink.agents.runtime.python.context.PythonRunnerContextImpl;
 import org.apache.flink.agents.runtime.python.event.PythonEvent;
 import org.apache.flink.agents.runtime.utils.EventUtil;
@@ -59,7 +60,9 @@ public class PythonActionExecutor {
         this.agentPlanJson = agentPlanJson;
     }
 
-    public void open(MapState<String, MemoryObjectImpl.MemoryItem> 
shortTermMemState)
+    public void open(
+            MapState<String, MemoryObjectImpl.MemoryItem> shortTermMemState,
+            FlinkAgentsMetricGroupImpl metricGroup)
             throws Exception {
         environmentManager.open();
         EmbeddedPythonEnvironment env = environmentManager.createEnvironment();
@@ -67,7 +70,7 @@ public class PythonActionExecutor {
         interpreter = env.getInterpreter();
         interpreter.exec(PYTHON_IMPORTS);
 
-        runnerContext = new PythonRunnerContextImpl(shortTermMemState);
+        runnerContext = new PythonRunnerContextImpl(shortTermMemState, 
metricGroup);
 
         // TODO: remove the set and get runner context after updating pemja to 
version 0.5.3
         Object pythonRunnerContextObject =
@@ -75,9 +78,10 @@ public class PythonActionExecutor {
         interpreter.set(FLINK_RUNNER_CONTEXT_VAR_NAME, 
pythonRunnerContextObject);
     }
 
-    public List<Event> executePythonFunction(PythonFunction function, 
PythonEvent event)
-            throws Exception {
+    public List<Event> executePythonFunction(
+            PythonFunction function, PythonEvent event, String actionName) 
throws Exception {
         runnerContext.checkNoPendingEvents();
+        runnerContext.setActionName(actionName);
         function.setInterpreter(interpreter);
 
         // TODO: remove the set and get runner context after updating pemja to 
version 0.5.3
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImplTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImplTest.java
new file mode 100644
index 0000000..d58f8cd
--- /dev/null
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/FlinkAgentsMetricGroupImplTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.agents.runtime.metrics;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/** Tests for {@link FlinkAgentsMetricGroupImpl}. */
+public class FlinkAgentsMetricGroupImplTest {
+
+    private FlinkAgentsMetricGroupImpl metricGroup;
+
+    @BeforeEach
+    void setUp() {
+        MetricGroup parentMetricGroup =
+                
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
+        metricGroup = new FlinkAgentsMetricGroupImpl(parentMetricGroup);
+    }
+
+    @Test
+    void testGetSubGroup() {
+        String name = "testGroup";
+        FlinkAgentsMetricGroupImpl result = metricGroup.getSubGroup(name);
+
+        assertNotNull(result);
+        assertEquals(result, metricGroup.getSubGroup(name));
+    }
+
+    @Test
+    void testGetGauge() {
+        String name = "testGauge";
+        UpdatableGaugeImpl result = metricGroup.getGauge(name);
+
+        assertNotNull(result);
+        assertSame(result, metricGroup.getGauge(name));
+    }
+
+    @Test
+    void testGetCounter() {
+        String name = "testCounter";
+        Counter result = metricGroup.getCounter(name);
+
+        assertNotNull(result);
+        assertEquals(result, metricGroup.getCounter(name));
+    }
+
+    @Test
+    void testGetMeterWithouCounter() {
+        String name = "testMeter";
+        Meter result = metricGroup.getMeter(name);
+
+        assertNotNull(result);
+        assertEquals(result, metricGroup.getMeter(name));
+    }
+
+    @Test
+    void testGetMeterWithCounter() {
+        String name = "testMeter";
+        Counter counter = new SimpleCounter();
+
+        Meter result = metricGroup.getMeter(name, counter);
+
+        assertNotNull(result);
+        assertEquals(result, metricGroup.getMeter(name));
+    }
+
+    @Test
+    void testGetHistogramWithoutWindowSize() {
+        String name = "testHistogram";
+        Histogram result = metricGroup.getHistogram(name);
+
+        assertNotNull(result);
+        assertEquals(result, metricGroup.getHistogram(name));
+    }
+
+    @Test
+    void testGetHistogramWithWindowSize() {
+        String name = "testHistogram";
+        int windowSize = 200;
+        Histogram result = metricGroup.getHistogram(name, windowSize);
+
+        assertNotNull(result);
+        assertEquals(result, metricGroup.getHistogram(name));
+    }
+}
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/UpdatableGaugeImplTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/UpdatableGaugeImplTest.java
new file mode 100644
index 0000000..da4519c
--- /dev/null
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/metrics/UpdatableGaugeImplTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.agents.runtime.metrics;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/** Tests for {@link UpdatableGaugeImpl}. */
+class UpdatableGaugeImplTest {
+
+    /** Test updating and getting integer values. */
+    @Test
+    void testUpdateAndGetInteger() {
+        UpdatableGaugeImpl<Integer> intGauge = new UpdatableGaugeImpl<>();
+
+        // Test with positive value
+        intGauge.update(42);
+        assertEquals(42, intGauge.getValue(), "Should return the updated 
integer value");
+
+        // Test with zero
+        intGauge.update(0);
+        assertEquals(0, intGauge.getValue(), "Should return zero");
+
+        // Test with negative value
+        intGauge.update(-10);
+        assertEquals(-10, intGauge.getValue(), "Should return the negative 
integer value");
+
+        // Test with maximum value
+        intGauge.update(Integer.MAX_VALUE);
+        assertEquals(Integer.MAX_VALUE, intGauge.getValue(), "Should return 
Integer.MAX_VALUE");
+
+        // Test with minimum value
+        intGauge.update(Integer.MIN_VALUE);
+        assertEquals(Integer.MIN_VALUE, intGauge.getValue(), "Should return 
Integer.MIN_VALUE");
+    }
+
+    /** Test updating and getting double values. */
+    @Test
+    void testUpdateAndGetDouble() {
+        UpdatableGaugeImpl<Double> doubleGauge = new UpdatableGaugeImpl<>();
+
+        // Test with positive value
+        doubleGauge.update(3.14);
+        assertEquals(3.14, doubleGauge.getValue(), 0.001, "Should return the 
updated double value");
+
+        // Test with zero
+        doubleGauge.update(0.0);
+        assertEquals(0.0, doubleGauge.getValue(), 0.001, "Should return zero");
+
+        // Test with negative value
+        doubleGauge.update(-2.5);
+        assertEquals(
+                -2.5, doubleGauge.getValue(), 0.001, "Should return the 
negative double value");
+
+        // Test with maximum value
+        doubleGauge.update(Double.MAX_VALUE);
+        assertEquals(
+                Double.MAX_VALUE, doubleGauge.getValue(), 0.001, "Should 
return Double.MAX_VALUE");
+
+        // Test with minimum value
+        doubleGauge.update(Double.MIN_VALUE);
+        assertEquals(
+                Double.MIN_VALUE, doubleGauge.getValue(), 0.001, "Should 
return Double.MIN_VALUE");
+    }
+
+    /** Test updating and getting float values. */
+    @Test
+    void testUpdateAndGetFloat() {
+        UpdatableGaugeImpl<Float> floatGauge = new UpdatableGaugeImpl<>();
+
+        // Test with positive value
+        floatGauge.update(2.5f);
+        assertEquals(2.5f, floatGauge.getValue(), 0.001f, "Should return the 
updated float value");
+
+        // Test with zero
+        floatGauge.update(0.0f);
+        assertEquals(0.0f, floatGauge.getValue(), 0.001f, "Should return 
zero");
+
+        // Test with negative value
+        floatGauge.update(-1.5f);
+        assertEquals(
+                -1.5f, floatGauge.getValue(), 0.001f, "Should return the 
negative float value");
+
+        // Test with maximum value
+        floatGauge.update(Float.MAX_VALUE);
+        assertEquals(
+                Float.MAX_VALUE, floatGauge.getValue(), 0.001f, "Should return 
Float.MAX_VALUE");
+
+        // Test with minimum value
+        floatGauge.update(Float.MIN_VALUE);
+        assertEquals(
+                Float.MIN_VALUE, floatGauge.getValue(), 0.001f, "Should return 
Float.MIN_VALUE");
+    }
+
+    /** Test updating and getting long values. */
+    @Test
+    void testUpdateAndGetLong() {
+        UpdatableGaugeImpl<Long> longGauge = new UpdatableGaugeImpl<>();
+
+        // Test with positive value
+        longGauge.update(10000000000L);
+        assertEquals(10000000000L, longGauge.getValue(), "Should return the 
updated long value");
+
+        // Test with zero
+        longGauge.update(0L);
+        assertEquals(0L, longGauge.getValue(), "Should return zero");
+
+        // Test with negative value
+        longGauge.update(-5000000000L);
+        assertEquals(-5000000000L, longGauge.getValue(), "Should return the 
negative long value");
+
+        // Test with maximum value
+        longGauge.update(Long.MAX_VALUE);
+        assertEquals(Long.MAX_VALUE, longGauge.getValue(), "Should return 
Long.MAX_VALUE");
+
+        // Test with minimum value
+        longGauge.update(Long.MIN_VALUE);
+        assertEquals(Long.MIN_VALUE, longGauge.getValue(), "Should return 
Long.MIN_VALUE");
+    }
+
+    /** Test initial state. */
+    @Test
+    void testInitialState() {
+        UpdatableGaugeImpl<Integer> intGauge = new UpdatableGaugeImpl<>();
+        assertNull(intGauge.getValue(), "Initial value should be null");
+    }
+}

Reply via email to