GitHub user GreatEugenius edited a discussion: Flink Agents Metrics Design
# Introduce
In Flink Agents, we aim to introduce a metrics mechanism that allows users not
only to understand their jobs through output results but also to easily collect
and analyze the usage of different Events, Actions, Models, and Tools within
the Agent jobs by integrating with external systems. When necessary, we will
also set up monitoring and alerting for key metrics—such as abnormal spikes in
Token usage over a short period. This will help users better understand the
runtime status of their jobs, enabling them to refine and optimize their
workflows to achieve desired outcomes.
We plan to integrate Flink Agents Metrics into Flink’s built-in metric system.
Through the Flink Web UI, users can conveniently view relevant parameters.
Additionally, leveraging the rich set of Metric Reporters provided by the Flink
metric system, users will be able to export Flink runtime metrics to external
systems, enabling visualization and setting up monitoring alerts.
# Flink Agents Metric Design
In Flink Agent jobs, the `ActionExecutionOperator` plays a central role in the
entire Agent lifecycle — from event parsing to action execution. To align with
Flink’s native metric architecture, we design the metric system by leveraging a
`ProxyMetricGroup` associated with the `ActionExecutionOperator`. This ensures
that all metrics defined under the `FlinkAgentMetricGroup` are automatically
registered at the operator-level (`OperatorMetricGroup`), enabling seamless
integration with Flink’s built-in monitoring capabilities.
The Flink Agents metrics system is logically divided into two main categories:
- **Builtin Metrics**: These are predefined metrics designed for common use
cases, such as tracking agent throughput, latency, and resource utilization.
They are implemented directly within core components like the
ActionExecutionOperator and BuiltInAction, ensuring automatic collection during
job execution.
- **User-defined Metrics**: For custom or scenario-specific requirements, the
system supports user-defined metrics via an extensible API. Users can create
and register their own metrics using the provided MetricGroup interface,
offering flexibility for advanced monitoring needs.
## Flink Agents Builtin Metric
To provide a comprehensive and extensible metric system, the Flink Agents
project defines a set of Builtin Metrics, which are automatically collected
during job execution. These metrics are implemented directly within core
components such as `ActionExecutionOperator` and `BuiltInAction`, ensuring
minimal user configuration while offering rich visibility into agent behavior.
### Survey of Metric Support in Agent-Based Frameworks
When developing metrics for Flink Agents, we analyzed the types of metrics
supported by mainstream frameworks such as LlamaIndex and LangChain. Based on
this analysis, metrics can be broadly categorized into three groups:
#### 1. **Performance Monitoring Metrics**
These metrics track system performance characteristics such as response time
and throughput. Commonly collected using tools like Prometheus or Grafana, they
provide insights into runtime behavior.
#### 2. **Resource and Cost Metrics**
These metrics monitor resource usage, particularly in relation to model token
consumption. They are essential for cost tracking and optimization.
Common metrics include:
- **Token Usage**:
- `total_tokens`: total number of tokens
- `prompt_tokens`: number of prompt tokens
- `completion_tokens`: number of generated tokens
#### 3. **Model Evaluation Metrics**
Model Evaluation Metrics are used to assess the quality of model input and
output, such as correctness, semantic similarity, and preference alignment.
These metrics are typically applied in offline evaluation scenarios rather than
real-time monitoring.
Common types include accuracy checks, text similarity measurements (e.g., based
on string or embedding distances), and pairwise comparisons between model
outputs for preference scoring or ranking.
### Proposed Scope of Builtin Metrics in Flink Agents
The MVP version of Flink Agents focuses on implementing Performance Monitoring
Metrics and Resource & Cost Metrics, which are most relevant for real-time
monitoring and observability. Support for Model Evaluation Metrics is planned
for future versions.
We have designed the following metrics at different levels of granularity,
using a two-dimensional structure based on component type (Agent, Event,
Action, Model, Tool) and metric type (Count, Meter, Histogram):
| **Component Type** | **Count**
| **Meter**
| **Histogram** |
| -------------------------------------------------- |
:----------------------------------------------------------- |
------------------------------------------------------------ |
------------------- |
| Agent<br />(Operater Builtin has been implemented) | NumOfInput<br
/>NumOfOutput | NumOfInputPerSec<br
/>NumOfOutputPerSec | |
| Event | NumOfEventProcessed
| NumOfEventProcessedPerSec
| |
| Action | NumOfActionsExecuting<br
/>NumOfActionsExecuted | NumOfActionsExecutedPerSec
| ActionExecutionTime |
| Model | NumOfModelCalls<br
/>NumOfToken<br />NumOfPromptToken<br />NumCompletionToken |
NumOfModelCallsPerSec<br />NumOfTokenPerSec<br />NumOfPromptTokenPerSec<br
/>NumCompletionTokenPerSec | ModelCallsDuration |
| Tool | NumOfToolCall
| NumOfToolCallPerSec
| ToolCallDuration |
**Note**: Token statistics depend on the output returned by the model, as
different models use different tokenizers.
For Builtin Metrics, we provide metrics for Event, ModelChat, and ToolCall at a
general level (without type distinction), as well as per-action metrics. For
example, the `NumOfEvent` metric counts the total number of events without
differentiation by event type, while the `NumOfAction` metric tracks both the
total number of actions executed and the count per action type.
### Implementation Example: NumOfEvent Builtin Metric
To illustrate how Builtin Metrics are implemented, consider the `NumOfEvent`
metric. In the `ActionExecutionOperator`, event counting can be achieved as
follows:
```java
public class ActionExecutionOperator<IN, OUT> {
private transient FlinkAgentsMetricGroup metricGroup;
public void open() throws Exception {
// Initialize metric group with Flink's operator-level metric context
metricGroup = new FlinkAgentsMetricGroup(getMetricGroup());
}
@Override
public void processElement(StreamRecord<IN> record) throws Exception {
while (!events.isEmpty()) {
Event event = events.pop();
// Record event occurrence using the metric group
metricGroup.markEvent(event);
}
}
}
```
This implementation ensures that every event processed by the operator is
counted and reported via the Flink metric system, enabling visibility through
the Web UI or external monitoring tools.
## Flink Agents User Define Metric
In Flink Agents, users implement their logic by defining custom Actions that
respond to various Events throughout the Agent lifecycle. To support
user-defined metrics, we introduce two new methods: `get_metric_group()` and
`get_action_metric_group()` in the RunnerContext. These methods allow users to
create or update global metrics and independent metrics for actions.
This method allows users to access the operator-level OperatorMetricGroup from
any Action. With this capability, users can register and update metrics
directly at the operator level while defining their Flink Agent jobs. The
resulting metric identifier is
`<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>.<metrix_name>`.
We also support Pre Action metrics, with the identifier being
`<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>.<action_name>.<metric_name>`.
Additionally, the system supports the creation of sub-metric groups to
distinguish metrics generated by different Actions. These sub-groups enable
more granular tracking and use the identifier:
`<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>.<user_define_group_name>.<metrix_name>`.
### **APIs**
#### MetricGroup
The `MetricGroup` interface enables hierarchical metric management through
`get_sub_group(name: str)` and provides dynamic creation/update of four core
metric types: counters (e.g., action counts), meters (e.g., event rates),
histograms (e.g., latency distributions), and gauges (e.g., system state).
These methods allow users to organize and track metrics within custom Actions
while integrating seamlessly with Flink’s operator-level monitoring system.
```python
class MetricGroup(ABC):
"""Abstract base class providing a metric group for action execution.
This metric group offers access to various metric types.
"""
@abstractmethod
def get_sub_group(self, name: str) -> "MetricGroup":
"""Create or retrieve a sub-metric group with the given name.
Parameters
----------
name : str
The name of the sub-metric group.
"""
@abstractmethod
def get_counter(self, name: str) -> "Counter":
"""Create or retrieve a counter with the given name.
Parameters
----------
name : str
The name of the counter.
"""
@abstractmethod
def get_meter(self, name: str) -> "Meter":
"""Create or retrieve a meter with the given name.
Parameters
----------
name : str
The name of the meter.
"""
@abstractmethod
def get_histogram(self, name: str, window_size: int = 100) -> "Histogram":
"""Create or retrieve a histogram with the given name and window size.
Parameters
----------
name : str
The name of the histogram.
window_size : int, optional
The sliding window size for histogram statistics.
"""
@abstractmethod
def get_gauge(self, name: str) -> "Gauge":
"""Create or retrieve a gauge with the given name.
Parameters
----------
name : str
The name of the gauge.
"""
```
**Note**: In `FlinkAgentsMetricGroup`, we maintain four internal maps keyed by
metric name for each type (`Counter`, `Meter`, `Histogram`, `Gauge`). If a
metric with a given name does not exist, it will be automatically created.
Otherwise, the existing metric is returned.
#### Core Metric Types
##### Counter
A counter is used to measure the number of occurrences of an event.
```python
class Counter(ABC):
"""A counter that measures the count of events."""
@abstractmethod
def inc(self, n: int = 1) -> None:
"""Increment the current count by the given value."""
@abstractmethod
def dec(self, n: int = 1) -> None:
"""Decrement the current count by the given value."""
@abstractmethod
def get_count(self) -> int:
"""Return the current count."""
```
##### Meter
A meter is used to track the rate of events over time (throughput).
```python
class Meter(ABC):
"""A meter that measures throughput."""
@abstractmethod
def mark(self, n: int = 1) -> None:
"""Mark the occurrence of one or more events."""
@abstractmethod
def get_rate(self) -> float:
"""Return the current event rate per second."""
```
##### Histogram
A histogram is used to collect and analyze distributions of values (e.g.,
latencies).
```python
class Histogram(ABC):
"""A histogram for recording values and computing statistical summaries."""
@abstractmethod
def update(self, value: int) -> None:
"""Record a new value into the histogram."""
@abstractmethod
def get_mean(self) -> float:
"""Return the average value."""
@abstractmethod
def get_max(self) -> int:
"""Return the maximum recorded value."""
@abstractmethod
def get_min(self) -> int:
"""Return the minimum recorded value."""
```
##### **Gauge**
A gauge is used to record a single value at a point in time (e.g., current
system load).
```python
class Gauge(ABC):
"""A gauge for recording a single value at a specific moment in time."""
@abstractmethod
def update(self, value: str) -> None:
"""Update the gauge with the given value."""
@abstractmethod
def get_value(self) -> str:
"""Return the current value of the gauge."""
```
#### RunnerContext
```python
class RunnerContext(ABC):
# ...
@abstractmethod
def get_agent_metric_group(self) -> MetricGroup:
"""Get the global metric group for flink agents.
Returns:
-------
MetricGroup
The global metric group shared across all actions.
"""
@abstractmethod
def get_action_metric_group(self) -> MetricGroup:
"""Get the individual metric group dedicated for each action.
Returns:
-------
MetricGroup
The individual metric group specific to the current action.
"""
```
### **Examples**
```python
class MyWorkflow(Workflow):
@action(InputEvent)
@staticmethod
def first_action(event: Event, ctx: RunnerContext): # noqa D102
start_time = time.time_ns()
input = event.input
content = input.get_review() + " first action."
ctx.send_event(MyEvent(value=content))
# Access the main agent metric group
metrics = ctx.get_agent_metric_group()
# Increment counters and meters
metrics.get_counter("numEvent").inc()
metrics.get_meter("numEventPerSecond").mark()
# Access the per-action metric group
action_metrics = ctx.get_action_metric_group()
action_metrics.get_histogram("actionLatencyMs") \
.update(int(time.time_ns() - start_time) // 1000000)
@action(MyEvent)
@staticmethod
def second_action(event: Event, ctx: RunnerContext): # noqa D102
input = event.value
content = input + " second action."
ctx.send_event(OutputEvent(output=content))
# Access the main agent metric group
metrics = ctx.get_metric_group()
# Update global metrics
metrics.get_counter("numEvent").inc()
metrics.get_meter("numEventPerSecond").mark()
# Creating and tracking metrics for MyEvent using submetric group
if isinstance(event, MyEvent):
sub_metrics = metrics.get_sub_metric_group("myEvent")
sub_metrics.get_counter("numEvent").inc()
sub_metrics.get_meter("numEventPerSecond").mark()
```
GitHub link: https://github.com/apache/flink-agents/discussions/73
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]