This is an automated email from the ASF dual-hosted git repository.
xuetaoli pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/develop by this push:
new 5652e783 feat: replace the metric filter by a unified metric filter
(#799)
5652e783 is described below
commit 5652e783127c263a8d0f05d9744f1c544c165ac0
Author: dubbo-go-bot <[email protected]>
AuthorDate: Sat Nov 8 13:50:28 2025 +0800
feat: replace the metric filter by a unified metric filter (#799)
* feat: replace the metric filter by a unified metric filter
(dubbo-go-pixiu/dubbo-go-pixiu/pull/15)
* fix the ci of constant/filter
* fix the ci of constant/filter
* replace the old defaultName
---------
Co-authored-by: Sirui Huang <[email protected]>
Co-authored-by: Xuetao Li <[email protected]>
---------
Co-authored-by: Sirui Huang <[email protected]>
Co-authored-by: Sirui Huang <[email protected]>
Co-authored-by: Xuetao Li <[email protected]>
---
docs/user/filter/metric.md | 362 +++++++++++++++++++++++
docs/user/filter/metric_CN.md | 362 +++++++++++++++++++++++
pkg/common/constant/filter.go | 9 +
pkg/context/http/context.go | 52 ++++
pkg/context/http/context_test.go | 334 +++++++++++++++++++++
pkg/filter/llm/proxy/filter.go | 20 +-
pkg/filter/metric/config.go | 108 +++++++
pkg/filter/metric/metric.go | 340 +++++++++++++++------
pkg/filter/metric/metric_test.go | 617 ++++++++++++++++++++++++++++++++++++++-
pkg/filter/prometheus/config.go | 4 +
pkg/filter/prometheus/metric.go | 4 +
pkg/prometheus/prometheus.go | 246 ++++++++++++++--
12 files changed, 2327 insertions(+), 131 deletions(-)
diff --git a/docs/user/filter/metric.md b/docs/user/filter/metric.md
new file mode 100644
index 00000000..32644656
--- /dev/null
+++ b/docs/user/filter/metric.md
@@ -0,0 +1,362 @@
+# Metric Reporter Filter (dgp.filter.http.metric)
+
+English | [中文](metric_CN.md)
+
+---
+
+## Overview
+
+The `dgp.filter.http.metric` filter provides unified metric reporting for
Pixiu gateway. It consolidates the functionality of two previous filters
(`dgp.filter.http.metric` and `dgp.filter.http.prometheusmetric`) and supports
both **Pull** and **Push** modes with OpenTelemetry integration.
+
+> **Note**: This filter defaults to **Push** mode. To use Pull mode,
explicitly specify `mode: "pull"` in the configuration.
+
+### Key Features
+
+- **Unified Entry Point**: Single filter for both Pull and Push modes
+- **OpenTelemetry Integration**: Pull mode uses OpenTelemetry for metrics
(consistent with Pixiu Tracing)
+- **Context-based Extension**: Other filters can record custom metrics via
`HttpContext.RecordMetric()`
+- **Backward Compatible**: Reuses logic from original filters
+
+---
+
+## Modes
+
+### Pull Mode (Recommended)
+
+Metrics are exposed via HTTP endpoint for Prometheus to scrape.
+
+**Characteristics:**
+- Uses OpenTelemetry SDK
+- Metrics exposed via global HTTP endpoint
+- Standard Prometheus pull model
+- Supports dynamic custom metrics from context
+
+**Use Cases:**
+- Long-running services
+- Kubernetes environments
+- Development and testing
+
+### Push Mode
+
+Metrics are actively pushed to Prometheus Push Gateway.
+
+**Characteristics:**
+- Uses Prometheus native SDK
+- Metrics pushed every N requests
+- Batch push reduces network overhead
+- Supports dynamic custom metrics from context (same as Pull mode)
+
+**Use Cases:**
+- Services behind firewalls
+- Short-lived batch jobs
+- Cannot expose inbound ports
+
+---
+
+## Configuration
+
+### Pull Mode
+
+```yaml
+static_resources:
+ listeners:
+ - name: "net/http"
+ protocol_type: "HTTP"
+ address:
+ socket_address:
+ address: "0.0.0.0"
+ port: 8888
+ filter_chains:
+ filters:
+ - name: dgp.filter.httpconnectionmanager
+ config:
+ route_config:
+ routes:
+ - match:
+ prefix: /
+ route:
+ cluster: backend
+ http_filters:
+ # Other filters...
+ - name: dgp.filter.http.httpproxy
+ config: {}
+
+ # MetricReporter must be placed last
+ - name: dgp.filter.http.metric
+ config:
+ mode: "pull"
+
+# Global metric configuration (controls the HTTP endpoint)
+metric:
+ enable: true
+ prometheus_port: 2222 # Access metrics at http://localhost:2222/metrics
+```
+
+### Push Mode
+
+```yaml
+http_filters:
+ - name: dgp.filter.http.metric
+ config:
+ mode: "push"
+ push_config:
+ gateway_url: "http://push-gateway:9091" # Push Gateway URL (default:
http://localhost:9091)
+ job_name: "pixiu" # Job name (default: pixiu)
+ push_interval: 100 # Push every 100 requests
(default: 100)
+ metric_path: "/metrics" # Push path (default:
/metrics)
+```
+
+**Note**: All fields in `push_config` have default values. If omitted,
defaults will be applied automatically.
+
+**Minimal Configuration (uses all defaults)**:
+```yaml
+http_filters:
+ - name: dgp.filter.http.metric
+ config:
+ mode: "push"
+ # push_config can be omitted or empty to use all defaults
+```
+
+**Ultra-minimal Configuration (defaults to Push mode)**:
+```yaml
+http_filters:
+ - name: dgp.filter.http.metric
+ config: {}
+ # Defaults to push mode with all default settings
+```
+
+**Default Values**:
+- `gateway_url`: `http://localhost:9091`
+- `job_name`: `pixiu`
+- `push_interval`: `100`
+- `metric_path`: `/metrics`
+
+---
+
+## Built-in Metrics
+
+**Pull and Push modes now use unified metric names and types:**
+
+| Metric Name | Type | Description | Labels | Status |
+|-------------|------|-------------|--------|--------|
+| `pixiu_requests_total` | Counter | Total number of requests | code, method,
host, url | ⚠️ Deprecated |
+| `pixiu_request_count` | Counter | Total number of requests | code, method,
host, url | ✅ Recommended |
+| `pixiu_request_elapsed` | Counter | Total request elapsed time
(milliseconds) | code, method, host, url | ✅ |
+| `pixiu_request_error_count` | Counter | Total error count | code, method,
host, url | ✅ |
+| `pixiu_request_content_length` | Counter | Request size (bytes) | code,
method, url | ✅ |
+| `pixiu_response_content_length` | Counter | Response size (bytes) | code,
method, url | ✅ |
+| `pixiu_process_time_millisec` | Histogram | Request processing time
distribution (milliseconds) | code, method, url | ✅ |
+
+**Backward Compatibility**:
+- For backward compatibility, both metrics are currently exported:
`pixiu_requests_total` (old) and `pixiu_request_count` (new)
+- Use the new metric name `pixiu_request_count` for new deployments
+- `pixiu_requests_total` will be removed in future versions
+- Both modes use identical metric names for easy switching
+
+---
+
+## Custom Metrics (Extension Feature)
+
+Other filters can record custom metrics that will be automatically collected
and reported by MetricReporter.
+
+**Supported in both Pull and Push modes**: Custom metrics recorded via
`HttpContext.RecordMetric()` are fully exported in both modes.
+
+### Usage in Custom Filters
+
+```go
+package myfilter
+
+import (
+ "fmt"
+ "time"
+ "github.com/apache/dubbo-go-pixiu/pkg/context/http"
+ "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
+)
+
+type Filter struct {
+ startTime time.Time
+}
+
+func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus {
+ f.startTime = time.Now()
+ return filter.Continue
+}
+
+func (f *Filter) Encode(ctx *http.HttpContext) filter.FilterStatus {
+ // Record counter metric
+ ctx.RecordMetric("my_requests_total", "counter", 1.0, map[string]string{
+ "method": ctx.GetMethod(),
+ "status": fmt.Sprintf("%d", ctx.GetStatusCode()),
+ })
+
+ // Record histogram metric
+ latency := time.Since(f.startTime).Milliseconds()
+ ctx.RecordMetric("my_request_duration_ms", "histogram", float64(latency),
map[string]string{
+ "endpoint": ctx.GetUrl(),
+ })
+
+ // Record gauge metric
+ ctx.RecordMetric("my_active_connections", "gauge", float64(42), nil)
+
+ return filter.Continue
+}
+```
+
+### Supported Metric Types
+
+| Type | Description | Example |
+|------|-------------|---------|
+| `counter` | Monotonically increasing value | Request count, error count |
+| `histogram` | Value distribution | Latency, request size |
+| `gauge` | Value that can go up or down | Active connections, memory usage |
+
+---
+
+## Setup Guide
+
+### Pull Mode Setup
+
+**Step 1: Configure Global Metric Endpoint**
+
+```yaml
+# conf.yaml
+metric:
+ enable: true
+ prometheus_port: 2222
+```
+
+**Step 2: Add MetricReporter Filter**
+
+```yaml
+http_filters:
+ - name: dgp.filter.http.metric
+ config:
+ mode: "pull"
+```
+
+**Step 3: Configure Prometheus**
+
+```yaml
+# prometheus.yml
+scrape_configs:
+ - job_name: 'pixiu'
+ static_configs:
+ - targets: ['localhost:2222']
+ scrape_interval: 15s
+```
+
+**Step 4: Access Metrics**
+
+```bash
+curl http://localhost:2222/metrics
+```
+
+### Push Mode Setup
+
+**Step 1: Start Push Gateway**
+
+```bash
+docker run -d -p 9091:9091 prom/pushgateway
+```
+
+**Step 2: Configure MetricReporter**
+
+```yaml
+http_filters:
+ - name: dgp.filter.http.metric
+ config:
+ mode: "push"
+ push_config:
+ gateway_url: "http://localhost:9091"
+ job_name: "pixiu"
+ push_interval: 100
+ metric_path: "/metrics"
+```
+
+**Step 3: Verify Metrics**
+
+```bash
+# View metrics in Push Gateway
+curl http://localhost:9091/metrics
+```
+
+---
+
+## Differences from Previous Filters
+
+### Replaces Legacy dgp.filter.http.prometheusmetric (Push)
+
+> **Important**: The `dgp.filter.http.metric` filter now supports both Pull
and Push modes, with Push as the default. The legacy
`dgp.filter.http.prometheusmetric` filter has been marked as deprecated.
+
+**Old Configuration (Deprecated):**
+```yaml
+- name: dgp.filter.http.prometheusmetric
+ config:
+ metric_collect_rules:
+ push_gateway_url: "http://localhost:9091"
+ counter_push: true
+ push_interval_threshold: 100
+ push_job_name: "pixiu"
+```
+
+**New Configuration (Recommended):**
+```yaml
+- name: dgp.filter.http.metric
+ config:
+ mode: "push"
+ push_config:
+ gateway_url: "http://localhost:9091"
+ job_name: "pixiu"
+ push_interval: 100
+ metric_path: "/metrics"
+```
+
+**With Default Configuration (Simpler):**
+```yaml
+- name: dgp.filter.http.metric
+ config: {}
+ # Defaults to push mode with gateway_url=http://localhost:9091,
job_name=pixiu, push_interval=100
+```
+
+---
+
+## Troubleshooting
+
+### Pull Mode
+
+**Problem**: Cannot access metrics endpoint
+
+**Solution**: Ensure global metric configuration is enabled:
+```yaml
+metric:
+ enable: true
+ prometheus_port: 2222
+```
+
+**Problem**: Metrics not showing up
+
+**Solution**: Check if filter is placed at the end of http_filters list
+
+### Push Mode
+
+**Problem**: Metrics not pushed to Gateway
+
+**Solution**:
+1. Verify Push Gateway is running: `curl http://push-gateway:9091`
+2. Check push interval threshold is met (send N requests)
+3. Review logs for push errors
+
+**Problem**: Duplicate metrics warnings
+
+**Solution**: Normal when running multiple tests; metrics will be properly
aggregated
+
+---
+
+## Notes
+
+- **Filter Order**: MetricReporter should be placed **last** in the
http_filters list
+- **Pull Endpoint**: Controlled by global `metric.prometheus_port`, not filter
config
+- **Unified Metrics**: Pull and Push modes use identical metric names and types
+- **Configuration Defaults**: All Push mode configuration fields can be
omitted with automatic defaults
+- **Thread Safety**: Filter is thread-safe and supports high-concurrency
scenarios
+
diff --git a/docs/user/filter/metric_CN.md b/docs/user/filter/metric_CN.md
new file mode 100644
index 00000000..8605021e
--- /dev/null
+++ b/docs/user/filter/metric_CN.md
@@ -0,0 +1,362 @@
+# 指标上报过滤器 (dgp.filter.http.metric)
+
+[English](metric.md) | 中文
+
+---
+
+## 概述
+
+`dgp.filter.http.metric` 过滤器为 Pixiu
网关提供统一的指标上报功能。它整合了之前的两个过滤器(`dgp.filter.http.metric` 和
`dgp.filter.http.prometheusmetric`)的功能,并支持 **Pull** 和 **Push** 两种模式,集成了
OpenTelemetry。
+
+> **注意**:此过滤器默认使用 **Push** 模式。如需使用 Pull 模式,请在配置中显式指定 `mode: "pull"`。
+
+### 核心特性
+
+- **统一入口**:单个过滤器支持 Pull 和 Push 两种模式
+- **OpenTelemetry 集成**:Pull 模式使用 OpenTelemetry(与 Pixiu Tracing 保持一致)
+- **Context 扩展**:其他过滤器可通过 `HttpContext.RecordMetric()` 记录自定义指标
+- **向后兼容**:复用原有过滤器的逻辑
+
+---
+
+## 模式说明
+
+### Pull 模式(推荐)
+
+指标通过 HTTP 端点暴露,供 Prometheus 抓取。
+
+**特点:**
+- 使用 OpenTelemetry SDK
+- 指标通过全局 HTTP 端点暴露
+- Prometheus 标准拉取模型
+- 支持来自 Context 的动态自定义指标
+
+**适用场景:**
+- 长期运行的服务
+- Kubernetes 环境
+- 开发和测试环境
+
+### Push 模式
+
+指标主动推送到 Prometheus Push Gateway。
+
+**特点:**
+- 使用 Prometheus 原生 SDK
+- 每 N 个请求推送一次指标
+- 批量推送减少网络开销
+- 支持来自 Context 的动态自定义指标(与 Pull 模式相同)
+
+**适用场景:**
+- 防火墙后的服务
+- 短生命周期批处理任务
+- 无法暴露入站端口的环境
+
+---
+
+## 配置说明
+
+### Pull 模式
+
+```yaml
+static_resources:
+ listeners:
+ - name: "net/http"
+ protocol_type: "HTTP"
+ address:
+ socket_address:
+ address: "0.0.0.0"
+ port: 8888
+ filter_chains:
+ filters:
+ - name: dgp.filter.httpconnectionmanager
+ config:
+ route_config:
+ routes:
+ - match:
+ prefix: /
+ route:
+ cluster: backend
+ http_filters:
+ # 其他过滤器...
+ - name: dgp.filter.http.httpproxy
+ config: {}
+
+ # MetricReporter 必须放在最后
+ - name: dgp.filter.http.metric
+ config:
+ mode: "pull"
+
+# 全局指标配置(控制 HTTP 端点)
+metric:
+ enable: true
+ prometheus_port: 2222 # 访问地址:http://localhost:2222/metrics
+```
+
+### Push 模式
+
+```yaml
+http_filters:
+ - name: dgp.filter.http.metric
+ config:
+ mode: "push"
+ push_config:
+ gateway_url: "http://push-gateway:9091" # Push Gateway
地址(默认:http://localhost:9091)
+ job_name: "pixiu" # 任务名称(默认:pixiu)
+ push_interval: 100 # 每 100 个请求推送一次(默认:100)
+ metric_path: "/metrics" # 推送路径(默认:/metrics)
+```
+
+**注意**:`push_config` 中的所有字段都有默认值。如果省略,将自动应用默认值。
+
+**最小化配置(使用所有默认值)**:
+```yaml
+http_filters:
+ - name: dgp.filter.http.metric
+ config:
+ mode: "push"
+ # push_config 可以省略或为空,将使用所有默认值
+```
+
+**极简配置(默认使用 Push 模式)**:
+```yaml
+http_filters:
+ - name: dgp.filter.http.metric
+ config: {}
+ # 默认使用 push 模式和所有默认配置
+```
+
+**默认值**:
+- `gateway_url`: `http://localhost:9091`
+- `job_name`: `pixiu`
+- `push_interval`: `100`
+- `metric_path`: `/metrics`
+
+---
+
+## 内置指标
+
+**Pull 和 Push 模式现在使用统一的指标名称和类型:**
+
+| 指标名称 | 类型 | 描述 | 标签 | 状态 |
+|---------|------|------|------|------|
+| `pixiu_requests_total` | Counter | 请求总数 | code, method, host, url | ⚠️ 已弃用 |
+| `pixiu_request_count` | Counter | 请求总数 | code, method, host, url | ✅ 推荐 |
+| `pixiu_request_elapsed` | Counter | 请求总耗时(毫秒)| code, method, host, url | ✅ |
+| `pixiu_request_error_count` | Counter | 错误总数 | code, method, host, url | ✅ |
+| `pixiu_request_content_length` | Counter | 请求大小(字节)| code, method, url | ✅ |
+| `pixiu_response_content_length` | Counter | 响应大小(字节)| code, method, url | ✅ |
+| `pixiu_process_time_millisec` | Histogram | 请求处理时长分布(毫秒)| code, method, url
| ✅ |
+
+**向后兼容说明**:
+- 为保持向后兼容,目前**同时导出**两个指标:`pixiu_requests_total`(旧)和 `pixiu_request_count`(新)
+- 推荐使用新的指标名称 `pixiu_request_count`
+- `pixiu_requests_total` 将在未来版本中移除
+- 两种模式使用相同的指标名称,便于在不同模式间切换
+
+---
+
+## 自定义指标(扩展功能)
+
+其他过滤器可以记录自定义指标,由 MetricReporter 自动收集并上报。
+
+**Pull 和 Push 模式都支持**:通过 `HttpContext.RecordMetric()` 记录的自定义指标在两种模式下都能完整导出。
+
+### 在自定义过滤器中使用
+
+```go
+package myfilter
+
+import (
+ "fmt"
+ "time"
+ "github.com/apache/dubbo-go-pixiu/pkg/context/http"
+ "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
+)
+
+type Filter struct {
+ startTime time.Time
+}
+
+func (f *Filter) Decode(ctx *http.HttpContext) filter.FilterStatus {
+ f.startTime = time.Now()
+ return filter.Continue
+}
+
+func (f *Filter) Encode(ctx *http.HttpContext) filter.FilterStatus {
+ // 记录 Counter 指标
+ ctx.RecordMetric("my_requests_total", "counter", 1.0, map[string]string{
+ "method": ctx.GetMethod(),
+ "status": fmt.Sprintf("%d", ctx.GetStatusCode()),
+ })
+
+ // 记录 Histogram 指标
+ latency := time.Since(f.startTime).Milliseconds()
+ ctx.RecordMetric("my_request_duration_ms", "histogram", float64(latency),
map[string]string{
+ "endpoint": ctx.GetUrl(),
+ })
+
+ // 记录 Gauge 指标
+ ctx.RecordMetric("my_active_connections", "gauge", float64(42), nil)
+
+ return filter.Continue
+}
+```
+
+### 支持的指标类型
+
+| 类型 | 描述 | 示例 |
+|------|------|------|
+| `counter` | 单调递增的值 | 请求数、错误数 |
+| `histogram` | 值的分布统计 | 延迟、请求大小 |
+| `gauge` | 可增可减的值 | 活跃连接数、内存使用 |
+
+---
+
+## 使用指南
+
+### Pull 模式配置
+
+**步骤 1:配置全局指标端点**
+
+```yaml
+# conf.yaml
+metric:
+ enable: true
+ prometheus_port: 2222
+```
+
+**步骤 2:添加 MetricReporter 过滤器**
+
+```yaml
+http_filters:
+ - name: dgp.filter.http.metric
+ config:
+ mode: "pull"
+```
+
+**步骤 3:配置 Prometheus**
+
+```yaml
+# prometheus.yml
+scrape_configs:
+ - job_name: 'pixiu'
+ static_configs:
+ - targets: ['localhost:2222']
+ scrape_interval: 15s
+```
+
+**步骤 4:访问指标**
+
+```bash
+curl http://localhost:2222/metrics
+```
+
+### Push 模式配置
+
+**步骤 1:启动 Push Gateway**
+
+```bash
+docker run -d -p 9091:9091 prom/pushgateway
+```
+
+**步骤 2:配置 MetricReporter**
+
+```yaml
+http_filters:
+ - name: dgp.filter.http.metric
+ config:
+ mode: "push"
+ push_config:
+ gateway_url: "http://localhost:9091"
+ job_name: "pixiu"
+ push_interval: 100
+ metric_path: "/metrics"
+```
+
+**步骤 3:验证指标**
+
+```bash
+# 查看 Push Gateway 中的指标
+curl http://localhost:9091/metrics
+```
+
+---
+
+## 与旧版过滤器的区别
+
+### 替代旧版 dgp.filter.http.prometheusmetric (Push)
+
+> **重要说明**:`dgp.filter.http.metric` 过滤器现在统一支持 Pull 和 Push 两种模式,默认为 Push 模式。旧版的
`dgp.filter.http.prometheusmetric` 过滤器已被标记为废弃。
+
+**旧配置(已废弃):**
+```yaml
+- name: dgp.filter.http.prometheusmetric
+ config:
+ metric_collect_rules:
+ push_gateway_url: "http://localhost:9091"
+ counter_push: true
+ push_interval_threshold: 100
+ push_job_name: "pixiu"
+```
+
+**新配置(推荐):**
+```yaml
+- name: dgp.filter.http.metric
+ config:
+ mode: "push"
+ push_config:
+ gateway_url: "http://localhost:9091"
+ job_name: "pixiu"
+ push_interval: 100
+ metric_path: "/metrics"
+```
+
+**使用默认配置(更简单):**
+```yaml
+- name: dgp.filter.http.metric
+ config: {}
+ # 默认使用 push 模式,gateway_url=http://localhost:9091, job_name=pixiu,
push_interval=100
+```
+
+---
+
+## 故障排查
+
+### Pull 模式
+
+**问题**:无法访问指标端点
+
+**解决**:确保全局指标配置已启用:
+```yaml
+metric:
+ enable: true
+ prometheus_port: 2222
+```
+
+**问题**:指标未显示
+
+**解决**:检查过滤器是否放在 http_filters 列表的最后
+
+### Push 模式
+
+**问题**:指标未推送到 Gateway
+
+**解决**:
+1. 验证 Push Gateway 正在运行:`curl http://push-gateway:9091`
+2. 检查推送间隔阈值是否达到(发送 N 个请求)
+3. 查看日志中的推送错误
+
+**问题**:重复指标警告
+
+**解决**:这在运行多个测试时是正常的;指标会被正确聚合
+
+---
+
+## 注意事项
+
+- **过滤器顺序**:MetricReporter 应放在 http_filters 列表的**最后**
+- **Pull 端点**:由全局 `metric.prometheus_port` 控制,而非过滤器配置
+- **指标统一**:Pull 和 Push 模式使用相同的指标名称和类型
+- **配置默认值**:Push 模式的所有配置字段都可以省略,会自动应用默认值
+- **线程安全**:过滤器是线程安全的,支持高并发场景
+
diff --git a/pkg/common/constant/filter.go b/pkg/common/constant/filter.go
index 4867f024..db1eb5c7 100644
--- a/pkg/common/constant/filter.go
+++ b/pkg/common/constant/filter.go
@@ -39,3 +39,12 @@ const (
DefaultReqTimeout = 10 * time.Second
)
+
+const (
+ // dgp.filter.http.metric
+ DefaultMetricMode = "push" // Default mode is push
+ DefaultMetricPushGatewayURL = "http://localhost:9091"
+ DefaultMetricPushJobName = "pixiu"
+ DefaultMetricPushInterval = 100
+ DefaultMetricPushPath = "/metrics"
+)
diff --git a/pkg/context/http/context.go b/pkg/context/http/context.go
index 6f173bfc..e331e242 100644
--- a/pkg/context/http/context.go
+++ b/pkg/context/http/context.go
@@ -25,6 +25,7 @@ import (
"net/http"
"net/url"
"strings"
+ "sync"
"time"
)
@@ -69,6 +70,10 @@ type HttpContext struct {
Request *http.Request
Writer http.ResponseWriter
+
+ // Metrics storage for unified metric reporting
+ metrics []*MetricData
+ metricsMu sync.RWMutex
}
type (
@@ -232,3 +237,50 @@ func (hc *HttpContext) GenerateHash() string {
req := hc.Request
return req.Method + "." + req.RequestURI
}
+
+// MetricData represents a single metric data point.
+type MetricData struct {
+ Name string // Metric name
+ Type string // Metric type: "counter", "histogram", "gauge"
+ Value float64 // Metric value
+ Labels map[string]string // Metric labels
+}
+
+// RecordMetric records a metric to the context.
+func (hc *HttpContext) RecordMetric(name string, metricType string, value
float64, labels map[string]string) {
+ // Create a copy of labels to avoid mutations (outside lock)
+ labelsCopy := make(map[string]string, len(labels))
+ for k, v := range labels {
+ labelsCopy[k] = v
+ }
+
+ metric := &MetricData{
+ Name: name,
+ Type: metricType,
+ Value: value,
+ Labels: labelsCopy,
+ }
+
+ // Only lock for the append operation
+ hc.metricsMu.Lock()
+ defer hc.metricsMu.Unlock()
+ hc.metrics = append(hc.metrics, metric)
+}
+
+// GetAllMetrics returns all recorded metrics.
+func (hc *HttpContext) GetAllMetrics() []*MetricData {
+ // Return a copy to avoid race conditions
+ hc.metricsMu.RLock()
+ defer hc.metricsMu.RUnlock()
+ result := make([]*MetricData, len(hc.metrics))
+ copy(result, hc.metrics)
+ return result
+}
+
+// ClearMetrics clears all recorded metrics.
+func (hc *HttpContext) ClearMetrics() {
+ hc.metricsMu.Lock()
+ defer hc.metricsMu.Unlock()
+
+ hc.metrics = nil
+}
diff --git a/pkg/context/http/context_test.go b/pkg/context/http/context_test.go
index 17961ff8..102cdafc 100644
--- a/pkg/context/http/context_test.go
+++ b/pkg/context/http/context_test.go
@@ -447,3 +447,337 @@ func TestErrorResponseJSONMarshaling(t *testing.T) {
}
})
}
+
+// TestRecordMetric tests recording metrics to context
+func TestRecordMetric(t *testing.T) {
+ req, err := http.NewRequest("GET", "http://example.com/test", nil)
+ if err != nil {
+ t.Fatalf("failed to create request: %v", err)
+ }
+
+ ctx := newTestHTTPContext(req)
+
+ // Record a metric
+ ctx.RecordMetric("test_metric", "counter", 1.0, map[string]string{
+ "label1": "value1",
+ "label2": "value2",
+ })
+
+ // Get all metrics
+ metrics := ctx.GetAllMetrics()
+
+ // Verify
+ if len(metrics) != 1 {
+ t.Errorf("expected 1 metric, got %d", len(metrics))
+ }
+
+ metric := metrics[0]
+ if metric.Name != "test_metric" {
+ t.Errorf("expected metric name 'test_metric', got %s",
metric.Name)
+ }
+ if metric.Type != "counter" {
+ t.Errorf("expected metric type 'counter', got %s", metric.Type)
+ }
+ if metric.Value != 1.0 {
+ t.Errorf("expected metric value 1.0, got %f", metric.Value)
+ }
+ if len(metric.Labels) != 2 {
+ t.Errorf("expected 2 labels, got %d", len(metric.Labels))
+ }
+ if metric.Labels["label1"] != "value1" {
+ t.Errorf("expected label1=value1, got %s",
metric.Labels["label1"])
+ }
+}
+
+// TestRecordMultipleMetrics tests recording multiple metrics
+func TestRecordMultipleMetrics(t *testing.T) {
+ req, _ := http.NewRequest("GET", "http://example.com/test", nil)
+ ctx := newTestHTTPContext(req)
+
+ // Record multiple metrics
+ ctx.RecordMetric("metric1", "counter", 1.0, map[string]string{"type":
"counter"})
+ ctx.RecordMetric("metric2", "histogram", 123.45,
map[string]string{"type": "histogram"})
+ ctx.RecordMetric("metric3", "gauge", 42.0, map[string]string{"type":
"gauge"})
+
+ // Get all metrics
+ metrics := ctx.GetAllMetrics()
+
+ // Verify
+ if len(metrics) != 3 {
+ t.Errorf("expected 3 metrics, got %d", len(metrics))
+ }
+
+ // Verify each metric
+ metricNames := make(map[string]bool)
+ for _, m := range metrics {
+ metricNames[m.Name] = true
+ }
+
+ if !metricNames["metric1"] || !metricNames["metric2"] ||
!metricNames["metric3"] {
+ t.Error("not all metrics were recorded")
+ }
+}
+
+// TestClearMetrics tests clearing metrics
+func TestClearMetrics(t *testing.T) {
+ req, _ := http.NewRequest("GET", "http://example.com/test", nil)
+ ctx := newTestHTTPContext(req)
+
+ // Record metrics
+ ctx.RecordMetric("test1", "counter", 1.0, nil)
+ ctx.RecordMetric("test2", "counter", 2.0, nil)
+
+ // Verify metrics exist
+ metrics := ctx.GetAllMetrics()
+ if len(metrics) != 2 {
+ t.Errorf("expected 2 metrics before clear, got %d",
len(metrics))
+ }
+
+ // Clear metrics
+ ctx.ClearMetrics()
+
+ // Verify metrics are cleared
+ metrics = ctx.GetAllMetrics()
+ if len(metrics) != 0 {
+ t.Errorf("expected 0 metrics after clear, got %d", len(metrics))
+ }
+}
+
+// TestMetricLabelIsolation tests that modifying labels doesn't affect stored
metrics
+func TestMetricLabelIsolation(t *testing.T) {
+ req, _ := http.NewRequest("GET", "http://example.com/test", nil)
+ ctx := newTestHTTPContext(req)
+
+ // Create labels
+ labels := map[string]string{
+ "key": "original",
+ }
+
+ // Record metric
+ ctx.RecordMetric("test_metric", "counter", 1.0, labels)
+
+ // Modify original labels
+ labels["key"] = "modified"
+ labels["new_key"] = "new_value"
+
+ // Get metrics
+ metrics := ctx.GetAllMetrics()
+
+ // Verify the stored metric labels are not affected
+ if len(metrics) != 1 {
+ t.Fatalf("expected 1 metric, got %d", len(metrics))
+ }
+
+ storedLabels := metrics[0].Labels
+ if storedLabels["key"] != "original" {
+ t.Errorf("expected label to be 'original', got %s",
storedLabels["key"])
+ }
+ if _, exists := storedLabels["new_key"]; exists {
+ t.Error("new_key should not exist in stored labels")
+ }
+}
+
+// TestRecordMetricWithNilLabels tests recording metrics with nil labels
+func TestRecordMetricWithNilLabels(t *testing.T) {
+ req, _ := http.NewRequest("GET", "http://example.com/test", nil)
+ ctx := newTestHTTPContext(req)
+
+ // Record metric with nil labels
+ ctx.RecordMetric("test_metric", "counter", 1.0, nil)
+
+ // Get metrics
+ metrics := ctx.GetAllMetrics()
+
+ // Verify
+ if len(metrics) != 1 {
+ t.Fatalf("expected 1 metric, got %d", len(metrics))
+ }
+
+ if metrics[0].Labels == nil {
+ t.Error("labels should not be nil, should be empty map")
+ }
+ if len(metrics[0].Labels) != 0 {
+ t.Errorf("expected 0 labels, got %d", len(metrics[0].Labels))
+ }
+}
+
+// TestRecordMetricWithEmptyLabels tests recording metrics with empty labels
+func TestRecordMetricWithEmptyLabels(t *testing.T) {
+ req, _ := http.NewRequest("GET", "http://example.com/test", nil)
+ ctx := newTestHTTPContext(req)
+
+ // Record metric with empty labels
+ ctx.RecordMetric("test_metric", "counter", 1.0, map[string]string{})
+
+ // Get metrics
+ metrics := ctx.GetAllMetrics()
+
+ // Verify
+ if len(metrics) != 1 {
+ t.Fatalf("expected 1 metric, got %d", len(metrics))
+ }
+
+ if len(metrics[0].Labels) != 0 {
+ t.Errorf("expected 0 labels, got %d", len(metrics[0].Labels))
+ }
+}
+
+// TestGetAllMetricsReturnsCopy tests that GetAllMetrics returns a copy
+func TestGetAllMetricsReturnsCopy(t *testing.T) {
+ req, _ := http.NewRequest("GET", "http://example.com/test", nil)
+ ctx := newTestHTTPContext(req)
+
+ // Record a metric
+ ctx.RecordMetric("test1", "counter", 1.0, map[string]string{"key":
"value"})
+
+ // Get metrics twice
+ metrics1 := ctx.GetAllMetrics()
+ metrics2 := ctx.GetAllMetrics()
+
+ // Verify they are different slices (not the same reference)
+ if &metrics1[0] == &metrics2[0] {
+ t.Error("GetAllMetrics should return a copy, not the same
reference")
+ }
+
+ // Verify contents are the same
+ if metrics1[0].Name != metrics2[0].Name {
+ t.Error("metric contents should be the same")
+ }
+}
+
+// TestConcurrentMetricRecording tests concurrent metric recording
+func TestConcurrentMetricRecording(t *testing.T) {
+ req, _ := http.NewRequest("GET", "http://example.com/test", nil)
+ ctx := newTestHTTPContext(req)
+
+ // Record metrics concurrently
+ done := make(chan bool)
+ for i := 0; i < 100; i++ {
+ go func(id int) {
+ ctx.RecordMetric(
+ fmt.Sprintf("metric_%d", id),
+ "counter",
+ float64(id),
+ map[string]string{"id": fmt.Sprintf("%d", id)},
+ )
+ done <- true
+ }(i)
+ }
+
+ // Wait for all goroutines
+ for i := 0; i < 100; i++ {
+ <-done
+ }
+
+ // Verify all metrics were recorded
+ metrics := ctx.GetAllMetrics()
+ if len(metrics) != 100 {
+ t.Errorf("expected 100 metrics, got %d", len(metrics))
+ }
+}
+
+// TestConcurrentGetAllMetrics tests concurrent calls to GetAllMetrics
+func TestConcurrentGetAllMetrics(t *testing.T) {
+ req, _ := http.NewRequest("GET", "http://example.com/test", nil)
+ ctx := newTestHTTPContext(req)
+
+ // Pre-populate with metrics
+ for i := 0; i < 50; i++ {
+ ctx.RecordMetric(fmt.Sprintf("metric_%d", i), "counter",
float64(i), nil)
+ }
+
+ // Concurrently call GetAllMetrics
+ done := make(chan bool)
+ for i := 0; i < 100; i++ {
+ go func() {
+ metrics := ctx.GetAllMetrics()
+ if len(metrics) != 50 {
+ t.Errorf("expected 50 metrics, got %d",
len(metrics))
+ }
+ done <- true
+ }()
+ }
+
+ // Wait for all goroutines
+ for i := 0; i < 100; i++ {
+ <-done
+ }
+}
+
+// TestMetricDataTypes tests different metric types
+func TestMetricDataTypes(t *testing.T) {
+ req, _ := http.NewRequest("GET", "http://example.com/test", nil)
+ ctx := newTestHTTPContext(req)
+
+ tests := []struct {
+ name string
+ metricName string
+ metricType string
+ value float64
+ }{
+ {"counter", "test_counter", "counter", 1.0},
+ {"histogram", "test_histogram", "histogram", 123.45},
+ {"gauge", "test_gauge", "gauge", 42.0},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ ctx.ClearMetrics() // Clear before each test
+
+ ctx.RecordMetric(tt.metricName, tt.metricType,
tt.value, nil)
+
+ metrics := ctx.GetAllMetrics()
+ if len(metrics) != 1 {
+ t.Fatalf("expected 1 metric, got %d",
len(metrics))
+ }
+
+ if metrics[0].Name != tt.metricName {
+ t.Errorf("expected name %s, got %s",
tt.metricName, metrics[0].Name)
+ }
+ if metrics[0].Type != tt.metricType {
+ t.Errorf("expected type %s, got %s",
tt.metricType, metrics[0].Type)
+ }
+ if metrics[0].Value != tt.value {
+ t.Errorf("expected value %f, got %f", tt.value,
metrics[0].Value)
+ }
+ })
+ }
+}
+
+// BenchmarkRecordMetric benchmarks recording metrics
+func BenchmarkRecordMetric(b *testing.B) {
+ req, _ := http.NewRequest("GET", "http://example.com/test", nil)
+ ctx := newTestHTTPContext(req)
+
+ labels := map[string]string{
+ "method": "GET",
+ "status": "200",
+ "path": "/api/test",
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ ctx.RecordMetric("benchmark_metric", "counter", 1.0, labels)
+ }
+}
+
+// BenchmarkGetAllMetrics benchmarks getting all metrics
+func BenchmarkGetAllMetrics(b *testing.B) {
+ req, _ := http.NewRequest("GET", "http://example.com/test", nil)
+ ctx := newTestHTTPContext(req)
+
+ // Pre-populate with metrics
+ for i := 0; i < 10; i++ {
+ ctx.RecordMetric(
+ fmt.Sprintf("metric_%d", i),
+ "counter",
+ float64(i),
+ map[string]string{"id": fmt.Sprintf("%d", i)},
+ )
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ _ = ctx.GetAllMetrics()
+ }
+}
diff --git a/pkg/filter/llm/proxy/filter.go b/pkg/filter/llm/proxy/filter.go
index 67791229..e9194e4d 100644
--- a/pkg/filter/llm/proxy/filter.go
+++ b/pkg/filter/llm/proxy/filter.go
@@ -134,7 +134,7 @@ func (factory *FilterFactory) Apply() error {
}
// PrepareFilterChain creates a new Filter instance for a request chain.
-func (factory *FilterFactory) PrepareFilterChain(ctx *contexthttp.HttpContext,
chain filter.FilterChain) error {
+func (factory *FilterFactory) PrepareFilterChain(_ *contexthttp.HttpContext,
chain filter.FilterChain) error {
f := &Filter{
client: factory.client,
scheme: factory.cfg.Scheme,
@@ -260,6 +260,7 @@ func (s *Strategy) Execute(executor *RequestExecutor)
(*http.Response, error) {
resp *http.Response
err error
attempts []UpstreamAttempt
+ problems []error
)
// 1. Pick initial endpoint from the cluster based on load balancing.
@@ -267,6 +268,13 @@ func (s *Strategy) Execute(executor *RequestExecutor)
(*http.Response, error) {
// 2. The main fallback loop. It continues as long as we have a valid
endpoint to try.
for endpoint != nil {
+ if endpoint.Metadata == nil {
+ endpoint.Metadata = make(map[string]string)
+ }
+ if executor.hc.Params == nil {
+ executor.hc.Params = make(map[string]any)
+ }
+
logger.Debugf("[dubbo-go-pixiu] client attempting endpoint [%s:
%v]", endpoint.ID, endpoint.Address.GetAddress())
// 3. Check the health of current endpoint,
@@ -291,6 +299,7 @@ func (s *Strategy) Execute(executor *RequestExecutor)
(*http.Response, error) {
retryPolicy, err = retry.GetRetryPolicy(endpoint)
if err != nil {
logger.Errorf("could not load retry policy for endpoint
[%s: %v]. Skipping to next endpoint.", endpoint.ID, err)
+ problems = append(problems, fmt.Errorf("endpoint [%s:
%v] retry policy error: %w", endpoint.ID, endpoint.Address.GetAddress(), err))
endpoint = getNextFallbackEndpoint(endpoint, executor)
continue
}
@@ -303,6 +312,7 @@ func (s *Strategy) Execute(executor *RequestExecutor)
(*http.Response, error) {
if err != nil {
// Request assembly error is fatal for this
endpoint, break retry loop to go to fallback
logger.Warnf("[dubbo-go-pixiu] failed to
assemble request for endpoint [%s: %v]: %v. Skipping to next endpoint.",
endpoint.ID, endpoint.Address.GetAddress(), err)
+ problems = append(problems,
fmt.Errorf("endpoint [%s: %v] retry: request assembly error: %w", endpoint.ID,
endpoint.Address.GetAddress(), err))
break
}
@@ -316,6 +326,7 @@ func (s *Strategy) Execute(executor *RequestExecutor)
(*http.Response, error) {
if err != nil {
logger.Warnf("[dubbo-go-pixiu] request to
endpoint [%s: %v] failed: %v", endpoint.ID, endpoint.Address.GetAddress(), err)
+ problems = append(problems,
fmt.Errorf("endpoint [%s: %v] retry: request error: %w", endpoint.ID,
endpoint.Address.GetAddress(), err))
attempt.Success = false
attempt.ErrorType = "network_error"
attempts = append(attempts, attempt)
@@ -332,6 +343,7 @@ func (s *Strategy) Execute(executor *RequestExecutor)
(*http.Response, error) {
attempt.Success = false
attempt.ErrorType = "status_code_error"
+ problems = append(problems, fmt.Errorf("endpoint [%s:
%v] retry: returned status code %d", endpoint.ID,
endpoint.Address.GetAddress(), resp.StatusCode))
attempts = append(attempts, attempt)
logger.Debugf("[dubbo-go-pixiu] attempt failed for
endpoint [%s: %v]. Error: %v, Status: %s trying to retry",
@@ -350,11 +362,11 @@ func (s *Strategy) Execute(executor *RequestExecutor)
(*http.Response, error) {
// Return the last known error and response.
if err == nil && resp != nil {
- err = fmt.Errorf("request failed with status code %d after all
retries and fallbacks", resp.StatusCode)
+ problems = append(problems, fmt.Errorf("request failed with
status code %d after all retries and fallbacks", resp.StatusCode))
} else if err == nil {
- err = errors.New("all retries and fallbacks failed without a
definitive error or response")
+ problems = append(problems, errors.New("all retries and
fallbacks failed without a definitive error or response"))
}
- return resp, err
+ return resp, errors.Join(problems...)
}
// getNextFallbackEndpoint checks if fallback is enabled and returns the next
endpoint.
diff --git a/pkg/filter/metric/config.go b/pkg/filter/metric/config.go
new file mode 100644
index 00000000..6387c1e2
--- /dev/null
+++ b/pkg/filter/metric/config.go
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package metric
+
+import (
+ "fmt"
+)
+
+import (
+ "go.opentelemetry.io/otel/metric/instrument/syncint64"
+)
+
+import (
+ "github.com/apache/dubbo-go-pixiu/pkg/common/constant"
+)
+
+// Config defines the configuration for the unified metric reporter filter.
+type Config struct {
+ // Mode defines the metric reporting mode: "pull" or "push"
+ Mode string `yaml:"mode" json:"mode"`
+
+ // Push configuration for push mode (Push Gateway)
+ // Note: Pull mode uses global metric configuration (metric.enable,
metric.prometheus_port)
+ Push PushConfig `yaml:"push_config" json:"push_config"`
+}
+
+// PushConfig defines the configuration for push mode.
+type PushConfig struct {
+ // GatewayURL is the Push Gateway URL (default: http://localhost:9091)
+ GatewayURL string `yaml:"gateway_url" json:"gateway_url"`
+
+ // JobName is the job name for Push Gateway (default: pixiu)
+ JobName string `yaml:"job_name" json:"job_name"`
+
+ // PushInterval defines how many requests to process before pushing
metrics (default: 100)
+ PushInterval int `yaml:"push_interval" json:"push_interval"`
+
+ // MetricPath is the path to push metrics to Push Gateway (default:
/metrics)
+ MetricPath string `yaml:"metric_path" json:"metric_path"`
+}
+
+type OTelInstruments struct {
+ totalElapsed syncint64.Counter
+ totalCount syncint64.Counter
+ totalError syncint64.Counter
+ sizeRequest syncint64.Counter
+ sizeResponse syncint64.Counter
+ durationHist syncint64.Histogram
+}
+
+// Validate validates the configuration based on mode.
+func (c *Config) Validate() error {
+ // Apply default mode if not specified
+ if c.Mode == "" {
+ c.Mode = constant.DefaultMetricMode
+ }
+
+ // Validate mode
+ if c.Mode != "pull" && c.Mode != "push" {
+ return fmt.Errorf("invalid mode '%s', must be 'pull' or
'push'", c.Mode)
+ }
+
+ // Validate push config if in push mode
+ // Pull mode has no filter-level configuration (uses global metric
config)
+ if c.Mode == "push" {
+ return c.Push.Validate()
+ }
+
+ return nil
+}
+
+// Validate validates push mode configuration and applies defaults for empty
fields.
+func (c *PushConfig) Validate() error {
+ // Apply defaults for empty fields
+ if c.GatewayURL == "" {
+ c.GatewayURL = constant.DefaultMetricPushGatewayURL
+ }
+
+ if c.JobName == "" {
+ c.JobName = constant.DefaultMetricPushJobName
+ }
+
+ if c.PushInterval <= 0 {
+ c.PushInterval = constant.DefaultMetricPushInterval
+ }
+
+ if c.MetricPath == "" {
+ c.MetricPath = constant.DefaultMetricPushPath
+ }
+
+ // All fields now have values (either user-provided or defaults)
+ return nil
+}
diff --git a/pkg/filter/metric/metric.go b/pkg/filter/metric/metric.go
index 781d21ac..3cbf58ce 100644
--- a/pkg/filter/metric/metric.go
+++ b/pkg/filter/metric/metric.go
@@ -20,6 +20,7 @@ package metric
import (
"fmt"
stdhttp "net/http"
+ "sync"
"time"
)
@@ -30,128 +31,318 @@ import (
"go.opentelemetry.io/otel/metric/global"
"go.opentelemetry.io/otel/metric/instrument"
- "go.opentelemetry.io/otel/metric/instrument/syncint64"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/client"
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
- "github.com/apache/dubbo-go-pixiu/pkg/context/http"
+ contextHttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
+ prom "github.com/apache/dubbo-go-pixiu/pkg/prometheus"
)
const (
+ // Kind defines the filter kind
Kind = constant.HTTPMetricFilter
)
-var (
- totalElapsed syncint64.Counter
- totalCount syncint64.Counter
- totalError syncint64.Counter
-
- sizeRequest syncint64.Counter
- sizeResponse syncint64.Counter
- durationHist syncint64.Histogram
-)
-
func init() {
filter.RegisterHttpFilter(&Plugin{})
}
type (
// Plugin is http filter plugin.
- Plugin struct {
- }
+ Plugin struct{}
+
// FilterFactory is http filter instance
FilterFactory struct {
+ cfg *Config
}
+
+ // Filter instance
Filter struct {
- start time.Time
+ cfg *Config
+
+ otelInstruments *OTelInstruments
+ promCollector *prom.Prometheus
+ start time.Time
}
- // Config describe the config of FilterFactory
- Config struct{}
)
+// Kind returns the filter kind.
func (p *Plugin) Kind() string {
return Kind
}
+// CreateFilterFactory creates a new filter factory.
func (p *Plugin) CreateFilterFactory() (filter.HttpFilterFactory, error) {
- return &FilterFactory{}, nil
+ return &FilterFactory{
+ cfg: &Config{},
+ }, nil
}
+// Config returns the configuration.
func (factory *FilterFactory) Config() any {
- return &struct{}{}
+ return factory.cfg
}
+// Apply validates the configuration.
func (factory *FilterFactory) Apply() error {
- // init
- err := registerOtelMetric()
- return err
+ return factory.cfg.Validate()
+}
+
+var (
+ globalOTelInstruments *OTelInstruments
+ otelInitOnce sync.Once
+ otelInitErr error
+)
+
+// initOTelInstruments initializes OpenTelemetry instruments (singleton).
+func initOTelInstruments() (*OTelInstruments, error) {
+ otelInitOnce.Do(func() {
+ otelInitErr = doInitOTelInstruments()
+ })
+ return globalOTelInstruments, otelInitErr
}
-func (factory *FilterFactory) PrepareFilterChain(ctx *http.HttpContext, chain
filter.FilterChain) error {
- f := &Filter{}
+func doInitOTelInstruments() error {
+ meter := global.MeterProvider().Meter("pixiu")
+
+ instruments := &OTelInstruments{}
+
+ elapsedCounter, err :=
meter.SyncInt64().Counter("pixiu_request_elapsed",
+ instrument.WithDescription("request total elapsed in pixiu"))
+ if err != nil {
+ return fmt.Errorf("register pixiu_request_elapsed metric
failed: %w", err)
+ }
+ instruments.totalElapsed = elapsedCounter
+
+ count, err := meter.SyncInt64().Counter("pixiu_request_count",
+ instrument.WithDescription("request total count in pixiu"))
+ if err != nil {
+ return fmt.Errorf("register pixiu_request_count metric failed:
%w", err)
+ }
+ instruments.totalCount = count
+
+ errorCounter, err :=
meter.SyncInt64().Counter("pixiu_request_error_count",
+ instrument.WithDescription("request error total count in
pixiu"))
+ if err != nil {
+ return fmt.Errorf("register pixiu_request_error_count metric
failed: %w", err)
+ }
+ instruments.totalError = errorCounter
+
+ sizeRequest, err :=
meter.SyncInt64().Counter("pixiu_request_content_length",
+ instrument.WithDescription("request total content length in
pixiu"))
+ if err != nil {
+ return fmt.Errorf("register pixiu_request_content_length metric
failed: %w", err)
+ }
+ instruments.sizeRequest = sizeRequest
+
+ sizeResponse, err :=
meter.SyncInt64().Counter("pixiu_response_content_length",
+ instrument.WithDescription("request total content length
response in pixiu"))
+ if err != nil {
+ return fmt.Errorf("register pixiu_response_content_length
metric failed: %w", err)
+ }
+ instruments.sizeResponse = sizeResponse
+
+ durationHist, err :=
meter.SyncInt64().Histogram("pixiu_process_time_millisec",
+ instrument.WithDescription("request process time response in
pixiu"))
+ if err != nil {
+ return fmt.Errorf("register pixiu_process_time_millisec metric
failed: %w", err)
+ }
+ instruments.durationHist = durationHist
+
+ globalOTelInstruments = instruments
+ logger.Infof("[MetricReporter] OpenTelemetry instruments registered")
+ return nil
+}
+
+// PrepareFilterChain prepares the filter chain.
+func (factory *FilterFactory) PrepareFilterChain(ctx *contextHttp.HttpContext,
chain filter.FilterChain) error {
+ // Copy config to avoid sharing factory's pointer
+ cfgCopy := *factory.cfg
+ f := &Filter{cfg: &cfgCopy}
+
+ // Initialize based on mode
+ switch factory.cfg.Mode {
+ case "pull":
+ instruments, err := initOTelInstruments()
+ if err != nil {
+ return err
+ }
+ f.otelInstruments = instruments
+ logger.Infof("[MetricReporter] Pull mode enabled")
+
+ case "push":
+ p := prom.NewPrometheus()
+ p.SetPushGatewayUrl(factory.cfg.Push.GatewayURL,
factory.cfg.Push.MetricPath)
+ p.SetPushIntervalThreshold(true, factory.cfg.Push.PushInterval)
+ p.SetPushGatewayJob(factory.cfg.Push.JobName)
+ f.promCollector = p
+ logger.Infof("[MetricReporter] Push mode enabled (gateway: %s,
interval: %d)",
+ factory.cfg.Push.GatewayURL,
factory.cfg.Push.PushInterval)
+ }
+
+ // Both modes need decode and encode filters
chain.AppendDecodeFilters(f)
chain.AppendEncodeFilters(f)
+
return nil
}
-func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus {
+// Decode handles the decode phase - records start time for both modes.
+func (f *Filter) Decode(ctx *contextHttp.HttpContext) filter.FilterStatus {
+ // Record start time for latency calculation
+ // Both pull and push modes report metrics in Encode phase
f.start = time.Now()
return filter.Continue
}
-func (f *Filter) Encode(c *http.HttpContext) filter.FilterStatus {
+// Encode reports metrics for both modes.
+func (f *Filter) Encode(ctx *contextHttp.HttpContext) filter.FilterStatus {
+ switch f.cfg.Mode {
+ case "pull":
+ return f.reportWithOTel(ctx)
+ case "push":
+ return f.reportWithPrometheus(ctx)
+ }
+
+ return filter.Continue
+}
+
+// reportWithOTel reports metrics using OpenTelemetry.
+func (f *Filter) reportWithOTel(ctx *contextHttp.HttpContext)
filter.FilterStatus {
+ if f.otelInstruments == nil {
+ logger.Errorf("[MetricReporter] OpenTelemetry instruments not
initialized")
+ errResp := contextHttp.InternalError.New()
+ ctx.SendLocalReply(errResp.Status, errResp.ToJSON())
+ return filter.Stop
+ }
+
+ // Report context metrics dynamically
+ contextMetrics := ctx.GetAllMetrics()
+ if len(contextMetrics) > 0 {
+ meter := global.MeterProvider().Meter("pixiu")
+
+ for _, m := range contextMetrics {
+ attrs := toOTelAttributes(m.Labels)
+
+ switch m.Type {
+ case "counter":
+ counter, err :=
meter.SyncInt64().Counter(m.Name,
+
instrument.WithDescription(fmt.Sprintf("Context counter: %s", m.Name)))
+ if err != nil {
+ logger.Warnf("[MetricReporter] Failed
to create counter %s: %v", m.Name, err)
+ continue
+ }
+ counter.Add(ctx.Ctx, int64(m.Value), attrs...)
+
+ case "histogram":
+ histogram, err :=
meter.SyncFloat64().Histogram(m.Name,
+
instrument.WithDescription(fmt.Sprintf("Context histogram: %s", m.Name)))
+ if err != nil {
+ logger.Warnf("[MetricReporter] Failed
to create histogram %s: %v", m.Name, err)
+ continue
+ }
+ histogram.Record(ctx.Ctx, m.Value, attrs...)
+
+ case "gauge":
+ gauge, err :=
meter.SyncInt64().UpDownCounter(m.Name,
+
instrument.WithDescription(fmt.Sprintf("Context gauge: %s", m.Name)))
+ if err != nil {
+ logger.Warnf("[MetricReporter] Failed
to create gauge %s: %v", m.Name, err)
+ continue
+ }
+ gauge.Add(ctx.Ctx, int64(m.Value), attrs...)
+ }
+ }
+ }
+ // Report built-in metrics
commonAttrs := []attribute.KeyValue{
- attribute.String("code", fmt.Sprintf("%d", c.GetStatusCode())),
- attribute.String("method", c.Request.Method),
- attribute.String("url", c.GetUrl()),
- attribute.String("host", c.Request.Host),
+ attribute.String("code", fmt.Sprintf("%d",
ctx.GetStatusCode())),
+ attribute.String("method", ctx.Request.Method),
+ attribute.String("url", ctx.GetUrl()),
+ attribute.String("host", ctx.Request.Host),
}
latency := time.Since(f.start)
- totalCount.Add(c.Ctx, 1, commonAttrs...)
+ f.otelInstruments.totalCount.Add(ctx.Ctx, 1, commonAttrs...)
latencyMilli := latency.Milliseconds()
- totalElapsed.Add(c.Ctx, latencyMilli, commonAttrs...)
- if c.LocalReply() {
- totalError.Add(c.Ctx, 1)
+ f.otelInstruments.totalElapsed.Add(ctx.Ctx, latencyMilli,
commonAttrs...)
+
+ if ctx.LocalReply() {
+ f.otelInstruments.totalError.Add(ctx.Ctx, 1)
}
- durationHist.Record(c.Ctx, latencyMilli, commonAttrs...)
- size, err := computeApproximateRequestSize(c.Request)
+ f.otelInstruments.durationHist.Record(ctx.Ctx, latencyMilli,
commonAttrs...)
+
+ size, err := computeApproximateRequestSize(ctx.Request)
if err != nil {
- logger.Warn("can not compute request size", err)
+ logger.Warnf("[MetricReporter] Cannot compute request size:
%v", err)
} else {
- sizeRequest.Add(c.Ctx, int64(size), commonAttrs...)
+ f.otelInstruments.sizeRequest.Add(ctx.Ctx, int64(size),
commonAttrs...)
}
- size, err = computeApproximateResponseSize(c.TargetResp)
+ size, err = computeApproximateResponseSize(ctx.TargetResp)
if err != nil {
- logger.Warn("can not compute response size", err)
+ logger.Warnf("[MetricReporter] Cannot compute response size:
%v", err)
} else {
- sizeResponse.Add(c.Ctx, int64(size), commonAttrs...)
+ f.otelInstruments.sizeResponse.Add(ctx.Ctx, int64(size),
commonAttrs...)
}
- logger.Debugf("[Metric] [UPSTREAM] receive request | %d | %s | %s | %s
| ", c.GetStatusCode(), latency, c.GetMethod(), c.GetUrl())
+ logger.Debugf("[MetricReporter] [PULL] request | %d | %s | %s | %s |",
+ ctx.GetStatusCode(), latency, ctx.GetMethod(), ctx.GetUrl())
+
return filter.Continue
}
-func computeApproximateResponseSize(res any) (int, error) {
- if res == nil {
- return 0, errors.New("client response is nil")
+// reportWithPrometheus reports metrics using Prometheus.
+func (f *Filter) reportWithPrometheus(ctx *contextHttp.HttpContext)
filter.FilterStatus {
+ if f.promCollector == nil {
+ logger.Errorf("[MetricReporter] Prometheus collector not
initialized")
+ errResp := contextHttp.InternalError.New()
+ ctx.SendLocalReply(errResp.Status, errResp.ToJSON())
+ return filter.Stop
}
- if unaryResponse, ok := res.(*client.UnaryResponse); ok {
- return len(unaryResponse.Data), nil
+
+ // Process and report custom context metrics
+ contextMetrics := ctx.GetAllMetrics()
+ for _, m := range contextMetrics {
+ if err := f.promCollector.RecordDynamicMetric(m.Name, m.Type,
m.Value, m.Labels); err != nil {
+ logger.Warnf("[MetricReporter] Failed to record dynamic
metric %s: %v", m.Name, err)
+ } else {
+ logger.Debugf("[MetricReporter] Recorded custom metric:
%s=%f (type: %s, labels: %v)",
+ m.Name, m.Value, m.Type, m.Labels)
+ }
}
- return 0, errors.New("response is not of type client.UnaryResponse")
+
+ // Report built-in Prometheus metrics
+ handlerFunc := f.promCollector.HandlerFunc()
+ if err := handlerFunc(ctx); err != nil {
+ logger.Errorf("[MetricReporter] Prometheus handler error: %v",
err)
+ }
+
+ logger.Debugf("[MetricReporter] [PUSH] request | %d | %s | %s |",
+ ctx.GetStatusCode(), ctx.GetMethod(), ctx.GetUrl())
+
+ return filter.Continue
+}
+
+// toOTelAttributes converts map[string]string to OpenTelemetry attributes.
+func toOTelAttributes(labels map[string]string) []attribute.KeyValue {
+ attrs := make([]attribute.KeyValue, 0, len(labels))
+ for k, v := range labels {
+ attrs = append(attrs, attribute.String(k, v))
+ }
+ return attrs
}
+// computeApproximateRequestSize computes the approximate size of an HTTP
request.
func computeApproximateRequestSize(r *stdhttp.Request) (int, error) {
if r == nil {
- return 0, errors.New("http.Request is null pointer ")
+ return 0, errors.New("http.Request is null pointer")
}
s := 0
if r.URL != nil {
@@ -172,50 +363,13 @@ func computeApproximateRequestSize(r *stdhttp.Request)
(int, error) {
return s, nil
}
-func registerOtelMetric() error {
- meter := global.MeterProvider().Meter("pixiu")
-
- elapsedCounter, err :=
meter.SyncInt64().Counter("pixiu_request_elapsed",
instrument.WithDescription("request total elapsed in pixiu"))
- if err != nil {
- logger.Errorf("register pixiu_request_elapsed metric failed,
err: %v", err)
- return err
- }
- totalElapsed = elapsedCounter
-
- count, err := meter.SyncInt64().Counter("pixiu_request_count",
instrument.WithDescription("request total count in pixiu"))
- if err != nil {
- logger.Errorf("register pixiu_request_count metric failed, err:
%v", err)
- return err
- }
- totalCount = count
-
- errorCounter, err :=
meter.SyncInt64().Counter("pixiu_request_error_count",
instrument.WithDescription("request error total count in pixiu"))
- if err != nil {
- logger.Errorf("register pixiu_request_error_count metric
failed, err: %v", err)
- return err
- }
- totalError = errorCounter
-
- sizeRequest, err =
meter.SyncInt64().Counter("pixiu_request_content_length",
instrument.WithDescription("request total content length in pixiu"))
- if err != nil {
- logger.Errorf("register pixiu_request_content_length metric
failed, err: %v", err)
- return err
- }
-
- sizeResponse, err =
meter.SyncInt64().Counter("pixiu_response_content_length",
instrument.WithDescription("request total content length response in pixiu"))
- if err != nil {
- logger.Errorf("register pixiu_response_content_length metric
failed, err: %v", err)
- return err
+// computeApproximateResponseSize computes the approximate size of an HTTP
response.
+func computeApproximateResponseSize(res any) (int, error) {
+ if res == nil {
+ return 0, errors.New("client response is nil")
}
-
- durationHist, err = meter.SyncInt64().Histogram(
- "pixiu_process_time_millicec",
- instrument.WithDescription("request process time response in
pixiu"),
- )
- if err != nil {
- logger.Errorf("register pixiu_process_time_millisec metric
failed, err: %v", err)
- return err
+ if unaryResponse, ok := res.(*client.UnaryResponse); ok {
+ return len(unaryResponse.Data), nil
}
-
- return nil
+ return 0, errors.New("response is not of type client.UnaryResponse")
}
diff --git a/pkg/filter/metric/metric_test.go b/pkg/filter/metric/metric_test.go
index c9f4b481..263ec7ea 100644
--- a/pkg/filter/metric/metric_test.go
+++ b/pkg/filter/metric/metric_test.go
@@ -18,31 +18,620 @@
package metric
import (
- "bytes"
+ "context"
+ "fmt"
"net/http"
+ "net/url"
"testing"
)
import (
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "go.opentelemetry.io/otel/metric/global"
+ "go.opentelemetry.io/otel/metric/instrument"
+
+ "go.opentelemetry.io/otel/sdk/metric"
)
import (
- "github.com/apache/dubbo-go-pixiu/pkg/context/mock"
+ "github.com/apache/dubbo-go-pixiu/pkg/common/extension/filter"
+ contextHttp "github.com/apache/dubbo-go-pixiu/pkg/context/http"
)
-func TestMetric(t *testing.T) {
- filter := &Filter{}
- err := registerOtelMetric()
- if err != nil {
- t.Fatal(err)
- return
+// mockResponseWriter is a test implementation of http.ResponseWriter
+type mockResponseWriter struct {
+ header http.Header
+ body []byte
+ status int
+}
+
+func (w *mockResponseWriter) Header() http.Header {
+ if w.header == nil {
+ w.header = make(http.Header)
+ }
+ return w.header
+}
+
+func (w *mockResponseWriter) Write(b []byte) (int, error) {
+ w.body = append(w.body, b...)
+ return len(b), nil
+}
+
+func (w *mockResponseWriter) WriteHeader(statusCode int) {
+ w.status = statusCode
+}
+
+// newTestHTTPContext creates a test HTTP context
+func newTestHTTPContext(t *testing.T) *contextHttp.HttpContext {
+ req, err := http.NewRequest("GET", "http://example.com/test", nil)
+ require.NoError(t, err)
+
+ return &contextHttp.HttpContext{
+ Request: req,
+ Writer: &mockResponseWriter{},
+ Ctx: context.Background(),
+ }
+}
+
+// mockFilterChain for testing
+type mockFilterChain struct {
+ decodeFilters []filter.HttpDecodeFilter
+ encodeFilters []filter.HttpEncodeFilter
+}
+
+func (m *mockFilterChain) AppendDecodeFilters(f ...filter.HttpDecodeFilter) {
+ m.decodeFilters = append(m.decodeFilters, f...)
+}
+
+func (m *mockFilterChain) AppendEncodeFilters(f ...filter.HttpEncodeFilter) {
+ m.encodeFilters = append(m.encodeFilters, f...)
+}
+
+func (m *mockFilterChain) OnDecode(ctx *contextHttp.HttpContext) {
+ // Not used in tests
+}
+
+func (m *mockFilterChain) OnEncode(ctx *contextHttp.HttpContext) {
+ // Not used in tests
+}
+
+// TestConfigValidate tests the config validation
+func TestConfigValidate(t *testing.T) {
+ tests := []struct {
+ name string
+ config *Config
+ wantError bool
+ wantMode string
+ }{
+ {
+ name: "empty mode defaults to push",
+ config: &Config{
+ Mode: "",
+ },
+ wantError: false,
+ wantMode: "push",
+ },
+ {
+ name: "invalid mode",
+ config: &Config{
+ Mode: "invalid",
+ },
+ wantError: true,
+ },
+ {
+ name: "valid pull mode",
+ config: &Config{
+ Mode: "pull",
+ },
+ wantError: false,
+ wantMode: "pull",
+ },
+ {
+ name: "valid push mode",
+ config: &Config{
+ Mode: "push",
+ Push: PushConfig{
+ GatewayURL: "http://localhost:9091",
+ JobName: "pixiu",
+ PushInterval: 100,
+ MetricPath: "/metrics",
+ },
+ },
+ wantError: false,
+ wantMode: "push",
+ },
+ {
+ name: "push mode with empty fields applies defaults",
+ config: &Config{
+ Mode: "push",
+ Push: PushConfig{
+ GatewayURL: "",
+ JobName: "",
+ PushInterval: 0,
+ MetricPath: "",
+ },
+ },
+ wantError: false,
+ wantMode: "push",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ factory := &FilterFactory{cfg: tt.config}
+ err := factory.Apply()
+
+ if tt.wantError {
+ assert.Error(t, err)
+ } else {
+ assert.NoError(t, err)
+ if tt.wantMode != "" {
+ assert.Equal(t, tt.wantMode,
factory.cfg.Mode, "Mode should be set to %s", tt.wantMode)
+ }
+ }
+ })
+ }
+}
+
+// TestPullModeInitialization tests pull mode initialization
+func TestPullModeInitialization(t *testing.T) {
+ factory := &FilterFactory{
+ cfg: &Config{
+ Mode: "pull",
+ },
+ }
+
+ err := factory.Apply()
+ require.NoError(t, err)
+
+ // Initialization happens in PrepareFilterChain, so we need to test that
+ ctx := newTestHTTPContext(t)
+ chain := &mockFilterChain{}
+ err = factory.PrepareFilterChain(ctx, chain)
+ require.NoError(t, err)
+
+ // Verify filter was created
+ require.Len(t, chain.decodeFilters, 1)
+}
+
+// TestPushModeInitialization tests push mode initialization
+func TestPushModeInitialization(t *testing.T) {
+ factory := &FilterFactory{
+ cfg: &Config{
+ Mode: "push",
+ Push: PushConfig{
+ GatewayURL: "http://localhost:9091",
+ JobName: "test_job",
+ PushInterval: 100,
+ MetricPath: "/metrics",
+ },
+ },
+ }
+
+ err := factory.Apply()
+ require.NoError(t, err)
+
+ // Initialization happens in PrepareFilterChain
+ ctx := newTestHTTPContext(t)
+ chain := &mockFilterChain{}
+ err = factory.PrepareFilterChain(ctx, chain)
+ require.NoError(t, err)
+
+ // Verify filter was created - push mode now has both decode and encode
filters
+ require.Len(t, chain.decodeFilters, 1)
+ require.Len(t, chain.encodeFilters, 1, "Push mode should have both
decode and encode filters")
+}
+
+// TestFilterWithPullMode tests filter encode with pull mode
+func TestFilterWithPullMode(t *testing.T) {
+ factory := &FilterFactory{
+ cfg: &Config{
+ Mode: "pull",
+ },
+ }
+
+ err := factory.Apply()
+ require.NoError(t, err)
+
+ ctx := newTestHTTPContext(t)
+ chain := &mockFilterChain{}
+ err = factory.PrepareFilterChain(ctx, chain)
+ require.NoError(t, err)
+
+ // Record metrics in context
+ ctx.RecordMetric("custom_metric", "counter", 1.0, map[string]string{
+ "key": "value",
+ })
+
+ // Pull mode should have both decode and encode filters
+ require.Len(t, chain.decodeFilters, 1)
+ require.Len(t, chain.encodeFilters, 1)
+
+ // Execute decode (records start time)
+ chain.decodeFilters[0].Decode(ctx)
+
+ // Execute encode (reports metrics)
+ status := chain.encodeFilters[0].Encode(ctx)
+ assert.Equal(t, 0, int(status))
+}
+
+// TestFilterWithPushMode tests filter encode with push mode
+func TestFilterWithPushMode(t *testing.T) {
+ factory := &FilterFactory{
+ cfg: &Config{
+ Mode: "push",
+ Push: PushConfig{
+ GatewayURL: "http://localhost:9091",
+ JobName: "test",
+ PushInterval: 100,
+ MetricPath: "/metrics",
+ },
+ },
+ }
+
+ err := factory.Apply()
+ require.NoError(t, err)
+
+ ctx := newTestHTTPContext(t)
+ chain := &mockFilterChain{}
+ err = factory.PrepareFilterChain(ctx, chain)
+ require.NoError(t, err)
+
+ // Record metrics in context
+ ctx.RecordMetric("custom_metric", "counter", 1.0, nil)
+
+ // Push mode now has both decode and encode filters
+ require.Len(t, chain.decodeFilters, 1)
+ require.Len(t, chain.encodeFilters, 1)
+
+ // Execute decode (only records start time)
+ status := chain.decodeFilters[0].Decode(ctx)
+ assert.Equal(t, 0, int(status))
+
+ // Execute encode (reports metrics)
+ status = chain.encodeFilters[0].Encode(ctx)
+ assert.Equal(t, 0, int(status))
+}
+
+// TestPluginKind tests the plugin kind
+func TestPluginKind(t *testing.T) {
+ plugin := &Plugin{}
+ assert.Equal(t, "dgp.filter.http.metric", plugin.Kind())
+}
+
+// TestCreateFilterFactory tests creating a filter factory
+func TestCreateFilterFactory(t *testing.T) {
+ plugin := &Plugin{}
+ factory, err := plugin.CreateFilterFactory()
+ require.NoError(t, err)
+ require.NotNil(t, factory)
+
+ ff := factory.(*FilterFactory)
+ assert.NotNil(t, ff.cfg)
+}
+
+// TestFilterWithUninitializedReporter tests that filter stops when reporter
is not initialized
+func TestFilterWithUninitializedReporter(t *testing.T) {
+ tests := []struct {
+ name string
+ mode string
+ }{
+ {"pull mode with nil instruments in encode", "pull"},
+ {"push mode with nil collector in encode", "push"},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Create filter with config but nil reporter
+ filter := &Filter{
+ cfg: &Config{Mode: tt.mode},
+ otelInstruments: nil,
+ promCollector: nil,
+ }
+
+ ctx := newTestHTTPContext(t)
+ ctx.RecordMetric("test", "counter", 1.0, nil)
+
+ // Decode should always succeed (just records time)
+ decodeStatus := int(filter.Decode(ctx))
+ assert.Equal(t, 0, decodeStatus) // filter.Continue = 0
+
+ // Encode should fail when reporter is not initialized
+ encodeStatus := int(filter.Encode(ctx))
+ assert.Equal(t, 1, encodeStatus) // filter.Stop = 1
+
+ // Should have sent local reply
+ assert.True(t, ctx.LocalReply())
+ assert.Equal(t, 500, ctx.GetStatusCode())
+ })
+ }
+}
+
+// TestDecodeMethod tests that Decode method records start time
+func TestDecodeMethod(t *testing.T) {
+ filter := &Filter{
+ cfg: &Config{Mode: "pull"},
+ }
+ ctx := newTestHTTPContext(t)
+
+ status := filter.Decode(ctx)
+ assert.Equal(t, 0, int(status)) // filter.Continue
+
+ // Verify start time was recorded
+ assert.False(t, filter.start.IsZero())
+}
+
+// TestMetricReporterPullMode tests pull mode with OpenTelemetry.
+func TestMetricReporterPullMode(t *testing.T) {
+ // Create factory with pull mode
+ factory := &FilterFactory{
+ cfg: &Config{
+ Mode: "pull",
+ },
+ }
+
+ // Validate configuration
+ err := factory.Apply()
+ require.NoError(t, err)
+
+ // Create HTTP request
+ req, err := http.NewRequest("POST",
"http://www.dubbogopixiu.com/mock/test?name=tc", nil)
+ require.NoError(t, err)
+
+ ctx := &contextHttp.HttpContext{
+ Request: req,
+ Writer: &mockResponseWriter{},
+ Ctx: context.Background(),
+ }
+
+ // Prepare filter chain
+ chain := &mockFilterChain{}
+ err = factory.PrepareFilterChain(ctx, chain)
+ require.NoError(t, err)
+
+ // Get filters
+ // Pull mode should have both decode and encode filters
+ require.Len(t, chain.decodeFilters, 1)
+ require.Len(t, chain.encodeFilters, 1)
+
+ // Execute decode (records start time)
+ decodeStatus := chain.decodeFilters[0].Decode(ctx)
+ assert.Equal(t, 0, int(decodeStatus))
+
+ // Execute encode (reports metrics)
+ encodeStatus := chain.encodeFilters[0].Encode(ctx)
+ assert.Equal(t, 0, int(encodeStatus))
+
+ t.Log("Pull mode metric reporter test finished successfully")
+}
+
+// TestMetricReporterPushMode tests push mode with Prometheus Push Gateway.
+func TestMetricReporterPushMode(t *testing.T) {
+ // Create factory with push mode
+ factory := &FilterFactory{
+ cfg: &Config{
+ Mode: "push",
+ Push: PushConfig{
+ GatewayURL: "http://127.0.0.1:9091",
+ JobName: "pixiu-test",
+ PushInterval: 10, // Push every 10 requests for
faster testing
+ MetricPath: "/metrics",
+ },
+ },
+ }
+
+ // Validate configuration
+ err := factory.Apply()
+ require.NoError(t, err)
+
+ // Prepare filter chain
+ testURL, _ := url.Parse("http://localhost/_api/health")
+ ctx := &contextHttp.HttpContext{
+ Request: &http.Request{
+ Method: "POST",
+ URL: testURL,
+ Host: "localhost",
+ },
+ Writer: &mockResponseWriter{},
+ Ctx: context.Background(),
+ }
+
+ chain := &mockFilterChain{}
+ err = factory.PrepareFilterChain(ctx, chain)
+ require.NoError(t, err)
+
+ // Push mode now has both decode and encode filters
+ require.Len(t, chain.decodeFilters, 1)
+ require.Len(t, chain.encodeFilters, 1, "Push mode should have both
decode and encode filters")
+
+ // Simulate multiple requests (to trigger push)
+ for i := 0; i < 15; i++ {
+ // Record some context metrics before decode
+ ctx.RecordMetric("api_requests_total", "counter", 1.0,
map[string]string{
+ "api": "health",
+ })
+
+ // Execute decode (only records start time)
+ decodeStatus := chain.decodeFilters[0].Decode(ctx)
+ assert.Equal(t, 0, int(decodeStatus))
+
+ // Execute encode (reports metrics in push mode)
+ encodeStatus := chain.encodeFilters[0].Encode(ctx)
+ assert.Equal(t, 0, int(encodeStatus))
+
+ // Clear metrics for next iteration
+ ctx.ClearMetrics()
+ }
+
+ t.Log("Push mode metric reporter test finished successfully")
+}
+
+// TestOTelInstrumentNoErrorOnDuplicateName tests that OpenTelemetry SDK
+// allows creating instruments with the same name without errors.
+// Although it returns different wrapper objects, it doesn't cause duplicate
registration issues.
+func TestOTelInstrumentNoErrorOnDuplicateName(t *testing.T) {
+ // Initialize OTel instruments first
+ factory := &FilterFactory{
+ cfg: &Config{
+ Mode: "pull",
+ },
+ }
+
+ err := factory.Apply()
+ require.NoError(t, err)
+
+ ctx := newTestHTTPContext(t)
+ chain := &mockFilterChain{}
+ err = factory.PrepareFilterChain(ctx, chain)
+ require.NoError(t, err)
+
+ // Get the meter
+ meter := global.MeterProvider().Meter("pixiu")
+
+ // Create the same counter multiple times with the same name
+ // This should NOT cause errors even though it's the same name
+ counter1, err1 := meter.SyncInt64().Counter("test_duplicate_counter",
+ instrument.WithDescription("First call"))
+ require.NoError(t, err1)
+ require.NotNil(t, counter1)
+
+ counter2, err2 := meter.SyncInt64().Counter("test_duplicate_counter",
+ instrument.WithDescription("Second call"))
+ require.NoError(t, err2)
+ require.NotNil(t, counter2)
+
+ counter3, err3 := meter.SyncInt64().Counter("test_duplicate_counter",
+ instrument.WithDescription("Third call"))
+ require.NoError(t, err3)
+ require.NotNil(t, counter3)
+
+ // Test with histogram
+ hist1, err4 := meter.SyncFloat64().Histogram("test_duplicate_histogram",
+ instrument.WithDescription("First histogram"))
+ require.NoError(t, err4)
+ require.NotNil(t, hist1)
+
+ hist2, err5 := meter.SyncFloat64().Histogram("test_duplicate_histogram",
+ instrument.WithDescription("Second histogram"))
+ require.NoError(t, err5)
+ require.NotNil(t, hist2)
+
+ // Test with gauge (UpDownCounter)
+ gauge1, err6 := meter.SyncInt64().UpDownCounter("test_duplicate_gauge",
+ instrument.WithDescription("First gauge"))
+ require.NoError(t, err6)
+ require.NotNil(t, gauge1)
+
+ gauge2, err7 := meter.SyncInt64().UpDownCounter("test_duplicate_gauge",
+ instrument.WithDescription("Second gauge"))
+ require.NoError(t, err7)
+ require.NotNil(t, gauge2)
+
+ // All instruments can be used without errors
+ counter1.Add(ctx.Ctx, 1)
+ counter2.Add(ctx.Ctx, 1)
+ counter3.Add(ctx.Ctx, 1)
+
+ hist1.Record(ctx.Ctx, 10.5)
+ hist2.Record(ctx.Ctx, 20.5)
+
+ gauge1.Add(ctx.Ctx, 1)
+ gauge2.Add(ctx.Ctx, -1)
+
+ t.Log("OpenTelemetry SDK allows same metric name without errors - no
duplicate registration issues")
+}
+
+// TestDynamicMetricsMultipleRequests tests that dynamic metrics from context
+// can be reported multiple times without issues (simulates real scenario).
+func TestDynamicMetricsMultipleRequests(t *testing.T) {
+ factory := &FilterFactory{
+ cfg: &Config{
+ Mode: "pull",
+ },
+ }
+
+ err := factory.Apply()
+ require.NoError(t, err)
+
+ // Simulate 100 requests with the same custom metric
+ for i := 0; i < 100; i++ {
+ ctx := newTestHTTPContext(t)
+
+ // Record the same metric name in every request
+ ctx.RecordMetric("api_requests", "counter", 1.0,
map[string]string{
+ "endpoint": "/test",
+ "method": "GET",
+ })
+ ctx.RecordMetric("api_latency", "histogram", float64(i*10),
map[string]string{
+ "endpoint": "/test",
+ })
+
+ // Create a new chain for each request
+ chain := &mockFilterChain{}
+ err = factory.PrepareFilterChain(ctx, chain)
+ require.NoError(t, err)
+
+ // Execute decode and encode
+ require.Len(t, chain.decodeFilters, 1)
+ require.Len(t, chain.encodeFilters, 1)
+
+ decodeStatus := chain.decodeFilters[0].Decode(ctx)
+ assert.Equal(t, 0, int(decodeStatus))
+
+ encodeStatus := chain.encodeFilters[0].Encode(ctx)
+ assert.Equal(t, 0, int(encodeStatus), "Request #%d: should
handle repeated metric names without error", i)
+ }
+
+ t.Log("Successfully processed 100 requests with same metric names - no
duplicate registration issues")
+}
+
+// TestSDKProviderRejectsRepeatedRegistration tests that when using SDK
MeterProvider directly,
+// repeated registration of the same metric name WILL cause an error.
+// This is different from using global.MeterProvider().
+func TestSDKProviderRejectsRepeatedRegistration(t *testing.T) {
+ reader := metric.NewManualReader()
+ provider := metric.NewMeterProvider(metric.WithReader(reader))
+ meter := provider.Meter("pixiu")
+
+ // First registration - should succeed
+ counter1, err1 := meter.SyncInt64().Counter("test_counter",
+ instrument.WithDescription("First"))
+ require.NoError(t, err1)
+ require.NotNil(t, counter1)
+
+ // Second registration with SAME NAME - should FAIL with SDK provider
+ _, err2 := meter.SyncInt64().Counter("test_counter",
+ instrument.WithDescription("Second"))
+
+ // SDK MeterProvider DOES reject duplicate registration
+ assert.Error(t, err2, "SDK MeterProvider should reject duplicate
instrument registration")
+ assert.Contains(t, err2.Error(), "instrument already registered",
+ "Error should indicate duplicate registration")
+
+ t.Log("✓ Confirmed: SDK MeterProvider rejects duplicate instrument
registration")
+}
+
+// TestGlobalProviderHandlesRepeatedCalls tests that when using
global.MeterProvider(),
+// which is what the actual code uses, repeated calls do NOT cause errors.
+func TestGlobalProviderHandlesRepeatedCalls(t *testing.T) {
+ // Use global meter provider (default noop or whatever is set)
+ meter := global.MeterProvider().Meter("pixiu")
+
+ // Create the same counter multiple times - this is what happens in
actual code
+ for i := 0; i < 10; i++ {
+ counter, err := meter.SyncInt64().Counter("global_test_counter",
+ instrument.WithDescription(fmt.Sprintf("Iteration %d",
i)))
+
+ // With global provider, this should NOT error
+ assert.NoError(t, err, "global.MeterProvider() should handle
repeated instrument creation")
+ assert.NotNil(t, counter)
+
+ // Use the counter
+ counter.Add(context.Background(), int64(i+1))
}
- request, err := http.NewRequest("POST",
"http://www.dubbogopixiu.com/mock/test?name=tc",
bytes.NewReader([]byte("{\"id\":\"12345\"}")))
- assert.NoError(t, err)
- c := mock.GetMockHTTPContext(request)
- filter.Decode(c)
- filter.Encode(c)
- t.Log("log filter test is finished")
+ t.Log("✓ Confirmed: global.MeterProvider() handles repeated instrument
creation without errors")
+ t.Log("✓ This explains why the actual code (which uses
global.MeterProvider) works fine")
}
diff --git a/pkg/filter/prometheus/config.go b/pkg/filter/prometheus/config.go
index d61cd540..83024855 100644
--- a/pkg/filter/prometheus/config.go
+++ b/pkg/filter/prometheus/config.go
@@ -18,10 +18,14 @@
package prometheus
type (
+ // MetricCollectConfiguration is the configuration for the legacy
Prometheus filter.
+ // Deprecated: Use dgp.filter.http.metric instead.
MetricCollectConfiguration struct {
Rules MetricCollectRule `yaml:"metric_collect_rules"
json:"metric_collect_rules"`
}
+ // MetricCollectRule defines the metric collection rules.
+ // Deprecated: Use dgp.filter.http.metric instead.
MetricCollectRule struct {
MetricPath string `json:"metric_path,omitempty"
yaml:"metric_path,omitempty"`
// Push Gateway URL in format http://domain:port
diff --git a/pkg/filter/prometheus/metric.go b/pkg/filter/prometheus/metric.go
index 044c4d00..08e9e081 100644
--- a/pkg/filter/prometheus/metric.go
+++ b/pkg/filter/prometheus/metric.go
@@ -15,6 +15,10 @@
* limitations under the License.
*/
+// Package prometheus provides the legacy Prometheus metric filter.
+//
+// Deprecated: This filter is deprecated and will be removed in a future
version.
+// Use dgp.filter.http.metric instead
package prometheus
import (
diff --git a/pkg/prometheus/prometheus.go b/pkg/prometheus/prometheus.go
index 273e1798..40b73f15 100644
--- a/pkg/prometheus/prometheus.go
+++ b/pkg/prometheus/prometheus.go
@@ -79,6 +79,7 @@ var resSzBuckets = []float64{1.0 * KB, 2.0 * KB, 5.0 * KB,
10.0 * KB, 100 * KB,
// counter, counter_vec, gauge, gauge_vec,
// histogram, histogram_vec, summary, summary_vec
+// Deprecated: Use reqCntNew instead. This will be removed in future versions.
var reqCnt = &Metric{
ID: "reqCnt",
Name: "requests_total",
@@ -87,10 +88,34 @@ var reqCnt = &Metric{
Args: []string{"code", "method", "host", "url"},
}
+var reqCntNew = &Metric{
+ ID: "reqCntNew",
+ Name: "request_count",
+ Description: "request total count in pixiu",
+ Type: "counter_vec",
+ Args: []string{"code", "method", "host", "url"},
+}
+
+var reqElapsed = &Metric{
+ ID: "reqElapsed",
+ Name: "request_elapsed",
+ Description: "request total elapsed in pixiu (milliseconds)",
+ Type: "counter_vec",
+ Args: []string{"code", "method", "host", "url"},
+}
+
+var reqErrorCnt = &Metric{
+ ID: "reqErrorCnt",
+ Name: "request_error_count",
+ Description: "request error total count in pixiu",
+ Type: "counter_vec",
+ Args: []string{"code", "method", "host", "url"},
+}
+
var reqDur = &Metric{
ID: "reqDur",
- Name: "request_duration_seconds",
- Description: "The HTTP request latencies in seconds.",
+ Name: "process_time_millisec",
+ Description: "request process time response in pixiu (milliseconds)",
Args: []string{"code", "method", "url"},
Type: "histogram_vec",
Buckets: reqDurBuckets,
@@ -98,24 +123,25 @@ var reqDur = &Metric{
var resSz = &Metric{
ID: "resSz",
- Name: "response_size_bytes",
- Description: "The HTTP response sizes in bytes.",
+ Name: "response_content_length",
+ Description: "request total content length response in pixiu (bytes)",
Args: []string{"code", "method", "url"},
- Type: "histogram_vec",
- Buckets: resSzBuckets,
+ Type: "counter_vec",
}
var reqSz = &Metric{
ID: "reqSz",
- Name: "request_size_bytes",
- Description: "The HTTP request sizes in bytes.",
+ Name: "request_content_length",
+ Description: "request total content length in pixiu (bytes)",
Args: []string{"code", "method", "url"},
- Type: "histogram_vec",
- Buckets: reqSzBuckets,
+ Type: "counter_vec",
}
var standardMetrics = []*Metric{
- reqCnt,
+ reqCnt, // Deprecated: for backward compatibility
+ reqCntNew, // New unified metric name
+ reqElapsed,
+ reqErrorCnt,
reqDur,
resSz,
reqSz,
@@ -202,9 +228,13 @@ func NewMetric(m *Metric, subsystem string)
prometheus.Collector {
type RequestCounterLabelMappingFunc func(c *contextHttp.HttpContext) string
type Prometheus struct {
- reqCnt *prometheus.CounterVec
- reqDur, reqSz, resSz *prometheus.HistogramVec
- Ppg PushGateway
+ reqCnt *prometheus.CounterVec // Deprecated
+ reqCntNew *prometheus.CounterVec // New unified name
+ reqElapsed *prometheus.CounterVec
+ reqErrorCnt *prometheus.CounterVec
+ reqDur *prometheus.HistogramVec
+ reqSz, resSz *prometheus.CounterVec
+ Ppg PushGateway
MetricsList []*Metric
MetricsPath string
@@ -215,6 +245,11 @@ type Prometheus struct {
URLLabelFromContext string
Datacontext context.Context
+
+ // Dynamic metrics storage for custom metrics
+ dynamicCounters sync.Map // map[string]*prometheus.CounterVec
+ dynamicGauges sync.Map // map[string]*prometheus.GaugeVec
+ dynamicHistograms sync.Map // map[string]*prometheus.HistogramVec
}
// PushGateway contains the configuration for pushing to a Prometheus
pushgateway (optional)
@@ -256,12 +291,18 @@ func (p *Prometheus) registerMetrics() {
case reqCnt:
p.reqCnt = metric.(*prometheus.CounterVec)
+ case reqCntNew:
+ p.reqCntNew = metric.(*prometheus.CounterVec)
+ case reqElapsed:
+ p.reqElapsed = metric.(*prometheus.CounterVec)
+ case reqErrorCnt:
+ p.reqErrorCnt = metric.(*prometheus.CounterVec)
case reqDur:
p.reqDur = metric.(*prometheus.HistogramVec)
case resSz:
- p.resSz = metric.(*prometheus.HistogramVec)
+ p.resSz = metric.(*prometheus.CounterVec)
case reqSz:
- p.reqSz = metric.(*prometheus.HistogramVec)
+ p.reqSz = metric.(*prometheus.CounterVec)
}
metricDef.MetricCollector = metric
}
@@ -341,7 +382,7 @@ func (p *Prometheus) HandlerFunc() ContextHandlerFunc {
start := time.Now()
reqSz, err1 := computeApproximateRequestSize(c.Request)
//fmt.Println("reqSz", reqSz)
- elapsed := float64(time.Since(start)) / float64(time.Second)
+ elapsed := float64(time.Since(start).Milliseconds())
//fmt.Println("elapsed ", elapsed)
url := p.RequestCounterURLLabelMappingFunc(c)
//fmt.Println("url ", url)
@@ -349,15 +390,28 @@ func (p *Prometheus) HandlerFunc() ContextHandlerFunc {
//fmt.Println("statusStr", statusStr)
method := c.GetMethod()
//fmt.Println("method ", method)
+ host := p.RequestCounterHostLabelMappingFunc(c)
+
+ // Record metrics aligned with Pull mode
+ // Update both old (deprecated) and new metric names for
backward compatibility
+ p.reqCnt.WithLabelValues(statusStr, method, host, url).Inc()
// Deprecated: will be removed
+ p.reqCntNew.WithLabelValues(statusStr, method, host, url).Inc()
// New unified name
+ p.reqElapsed.WithLabelValues(statusStr, method, host,
url).Add(elapsed)
p.reqDur.WithLabelValues(statusStr, method,
url).Observe(elapsed)
- p.reqCnt.WithLabelValues(statusStr, method,
p.RequestCounterHostLabelMappingFunc(c), url).Inc()
+
if err1 == nil {
- p.reqSz.WithLabelValues(statusStr, method,
url).Observe(float64(reqSz))
+ p.reqSz.WithLabelValues(statusStr, method,
url).Add(float64(reqSz))
}
resSz, err2 := computeApproximateResponseSize(c.TargetResp)
if err2 == nil {
- p.resSz.WithLabelValues(statusStr, method,
url).Observe(float64(resSz))
+ p.resSz.WithLabelValues(statusStr, method,
url).Add(float64(resSz))
+ }
+
+ // Record errors
+ if c.LocalReply() {
+ p.reqErrorCnt.WithLabelValues(statusStr, method, host,
url).Inc()
}
+
p.Ppg.mutex.Lock()
p.Ppg.counter = p.Ppg.counter + 1
defer p.Ppg.mutex.Unlock()
@@ -398,3 +452,155 @@ func computeApproximateResponseSize(res any) (int, error)
{
}
return 0, errors.New("response is not of type client.UnaryResponse")
}
+
+// RecordDynamicMetric records a dynamic metric based on type (counter, gauge,
histogram)
+func (p *Prometheus) RecordDynamicMetric(name string, metricType string, value
float64, labels map[string]string) error {
+ // Extract label keys and values
+ labelKeys := make([]string, 0, len(labels))
+ labelValues := make([]string, 0, len(labels))
+ for k, v := range labels {
+ labelKeys = append(labelKeys, k)
+ labelValues = append(labelValues, v)
+ }
+
+ switch metricType {
+ case "counter":
+ return p.recordDynamicCounter(name, value, labelKeys,
labelValues)
+ case "gauge":
+ return p.recordDynamicGauge(name, value, labelKeys, labelValues)
+ case "histogram":
+ return p.recordDynamicHistogram(name, value, labelKeys,
labelValues)
+ default:
+ return errors.New("unsupported metric type: " + metricType)
+ }
+}
+
+// recordDynamicCounter records a counter metric
+func (p *Prometheus) recordDynamicCounter(name string, value float64,
labelKeys, labelValues []string) error {
+ // Create a unique key for this metric with its label keys
+ metricKey := name + "_" + joinLabels(labelKeys)
+
+ // Try to load existing counter
+ if metric, ok := p.dynamicCounters.Load(metricKey); ok {
+ counter := metric.(*prometheus.CounterVec)
+ counter.WithLabelValues(labelValues...).Add(value)
+ return nil
+ }
+
+ // Create new counter
+ counter := prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Subsystem: p.Subsystem,
+ Name: name,
+ Help: "Dynamic counter: " + name,
+ },
+ labelKeys,
+ )
+
+ // Register the metric
+ if err := prometheus.Register(counter); err != nil {
+ // Metric might already be registered, try to use it
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ counter = are.ExistingCollector.(*prometheus.CounterVec)
+ } else {
+ return err
+ }
+ }
+
+ // Store for future use
+ p.dynamicCounters.Store(metricKey, counter)
+ counter.WithLabelValues(labelValues...).Add(value)
+ return nil
+}
+
+// recordDynamicGauge records a gauge metric
+func (p *Prometheus) recordDynamicGauge(name string, value float64, labelKeys,
labelValues []string) error {
+ // Create a unique key for this metric with its label keys
+ metricKey := name + "_" + joinLabels(labelKeys)
+
+ // Try to load existing gauge
+ if metric, ok := p.dynamicGauges.Load(metricKey); ok {
+ gauge := metric.(*prometheus.GaugeVec)
+ gauge.WithLabelValues(labelValues...).Set(value)
+ return nil
+ }
+
+ // Create new gauge
+ gauge := prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Subsystem: p.Subsystem,
+ Name: name,
+ Help: "Dynamic gauge: " + name,
+ },
+ labelKeys,
+ )
+
+ // Register the metric
+ if err := prometheus.Register(gauge); err != nil {
+ // Metric might already be registered, try to use it
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ gauge = are.ExistingCollector.(*prometheus.GaugeVec)
+ } else {
+ return err
+ }
+ }
+
+ // Store for future use
+ p.dynamicGauges.Store(metricKey, gauge)
+ gauge.WithLabelValues(labelValues...).Set(value)
+ return nil
+}
+
+// recordDynamicHistogram records a histogram metric
+func (p *Prometheus) recordDynamicHistogram(name string, value float64,
labelKeys, labelValues []string) error {
+ // Create a unique key for this metric with its label keys
+ metricKey := name + "_" + joinLabels(labelKeys)
+
+ // Try to load existing histogram
+ if metric, ok := p.dynamicHistograms.Load(metricKey); ok {
+ histogram := metric.(*prometheus.HistogramVec)
+ histogram.WithLabelValues(labelValues...).Observe(value)
+ return nil
+ }
+
+ // Create new histogram with default buckets
+ histogram := prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Subsystem: p.Subsystem,
+ Name: name,
+ Help: "Dynamic histogram: " + name,
+ Buckets: prometheus.DefBuckets,
+ },
+ labelKeys,
+ )
+
+ // Register the metric
+ if err := prometheus.Register(histogram); err != nil {
+ // Metric might already be registered, try to use it
+ if are, ok := err.(prometheus.AlreadyRegisteredError); ok {
+ histogram =
are.ExistingCollector.(*prometheus.HistogramVec)
+ } else {
+ return err
+ }
+ }
+
+ // Store for future use
+ p.dynamicHistograms.Store(metricKey, histogram)
+ histogram.WithLabelValues(labelValues...).Observe(value)
+ return nil
+}
+
+// joinLabels creates a consistent key from label keys
+func joinLabels(labels []string) string {
+ if len(labels) == 0 {
+ return "no_labels"
+ }
+ result := ""
+ for i, label := range labels {
+ if i > 0 {
+ result += "_"
+ }
+ result += label
+ }
+ return result
+}