This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/main by this push:
new fb6641dc9 add prometheus pushgateway support (#2415)
fb6641dc9 is described below
commit fb6641dc99c23a10f9dabf4e753397ac889c549b
Author: foghost <[email protected]>
AuthorDate: Mon Sep 4 14:06:07 2023 +0800
add prometheus pushgateway support (#2415)
---
common/extension/metrics.go | 42 ---------------
common/extension/metrics_test.go | 49 -----------------
config/metric_config.go | 82 +++++++++++++++++-----------
metrics/prometheus/registry.go | 96 +++++++++++++++++++++++++++------
metrics/prometheus/registry_test.go | 87 +++++++++++++++++++++++++++---
metrics/prometheus/reporter.go | 103 ------------------------------------
6 files changed, 213 insertions(+), 246 deletions(-)
diff --git a/common/extension/metrics.go b/common/extension/metrics.go
deleted file mode 100644
index 639d92106..000000000
--- a/common/extension/metrics.go
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package extension
-
-import (
- "dubbo.apache.org/dubbo-go/v3/metrics"
-)
-
-// we couldn't store the instance because the some instance may initialize
before loading configuration
-// so lazy initialization will be better.
-var metricReporterMap = make(map[string]func(config *metrics.ReporterConfig)
metrics.Reporter, 4)
-
-// SetMetricReporter sets a reporter with the @name
-func SetMetricReporter(name string, reporterFunc func(config
*metrics.ReporterConfig) metrics.Reporter) {
- metricReporterMap[name] = reporterFunc
-}
-
-// GetMetricReporter finds the reporter with @name.
-// if not found, it will panic.
-// we should know that this method usually is called when system starts, so we
should panic
-func GetMetricReporter(name string, config *metrics.ReporterConfig)
metrics.Reporter {
- reporterFunc, found := metricReporterMap[name]
- if !found {
- panic("Cannot find the reporter with name: " + name)
- }
- return reporterFunc(config)
-}
diff --git a/common/extension/metrics_test.go b/common/extension/metrics_test.go
deleted file mode 100644
index 64f76422c..000000000
--- a/common/extension/metrics_test.go
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package extension
-
-import (
- "testing"
-)
-
-import (
- "github.com/stretchr/testify/assert"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/metrics"
-)
-
-func TestGetMetricReporter(t *testing.T) {
- reporter := &mockReporter{}
- name := "mock"
- SetMetricReporter(name, func(config *metrics.ReporterConfig)
metrics.Reporter {
- return reporter
- })
- res := GetMetricReporter(name, metrics.NewReporterConfig())
- assert.Equal(t, reporter, res)
-}
-
-type mockReporter struct{}
-
-// implement the interface of Reporter
-func (m *mockReporter) StartServer(config *metrics.ReporterConfig) {
-}
-
-func (m *mockReporter) ShutdownServer() {
-}
diff --git a/config/metric_config.go b/config/metric_config.go
index b0e45d06a..0859b57cc 100644
--- a/config/metric_config.go
+++ b/config/metric_config.go
@@ -24,45 +24,56 @@ import (
import (
"github.com/creasty/defaults"
- "github.com/dubbogo/gost/log/logger"
-
"github.com/pkg/errors"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
- "dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metrics"
)
// MetricConfig This is the config struct for all metrics implementation
type MetricConfig struct {
- Mode string `default:"pull" yaml:"mode"
json:"mode,omitempty" property:"mode"` // push or pull,
- Namespace string `default:"dubbo" yaml:"namespace"
json:"namespace,omitempty" property:"namespace"`
- Enable *bool `default:"false" yaml:"enable"
json:"enable,omitempty" property:"enable"`
- Port string `default:"9090" yaml:"port"
json:"port,omitempty" property:"port"`
- Path string `default:"/metrics" yaml:"path"
json:"path,omitempty" property:"path"`
- PushGatewayAddress string `default:"" yaml:"push-gateway-address"
json:"push-gateway-address,omitempty" property:"push-gateway-address"`
- SummaryMaxAge int64 `default:"600000000000"
yaml:"summary-max-age" json:"summary-max-age,omitempty"
property:"summary-max-age"`
- Protocol string `default:"prometheus" yaml:"protocol"
json:"protocol,omitempty" property:"protocol"`
- rootConfig *RootConfig
+ Enable *bool `default:"false" yaml:"enable"
json:"enable,omitempty" property:"enable"`
+ Port string `default:"9090" yaml:"port"
json:"port,omitempty" property:"port"`
+ Path string `default:"/metrics" yaml:"path"
json:"path,omitempty" property:"path"`
+ Protocol string `default:"prometheus" yaml:"protocol"
json:"protocol,omitempty" property:"protocol"`
+ Prometheus *PrometheusConfig `yaml:"prometheus" json:"prometheus"
property:"prometheus"`
+ Aggregation *AggregateConfig `yaml:"aggregation" json:"aggregation"
property:"aggregation"`
+ rootConfig *RootConfig
+}
+
+type AggregateConfig struct {
+ Enabled *bool `default:"false" yaml:"enabled"
json:"enabled,omitempty" property:"enabled"`
+ BucketNum int `default:"10" yaml:"bucket-num"
json:"bucket-num,omitempty" property:"bucket-num"`
+ TimeWindowSeconds int `default:"120" yaml:"time-window-seconds"
json:"time-window-seconds,omitempty" property:"time-window-seconds"`
+}
+
+type PrometheusConfig struct {
+ Exporter *Exporter `yaml:"exporter"
json:"exporter,omitempty" property:"exporter"`
+ Pushgateway *PushgatewayConfig `yaml:"pushgateway"
json:"pushgateway,omitempty" property:"pushgateway"`
+}
+
+type Exporter struct {
+ Enabled *bool `default:"false" yaml:"enabled" json:"enabled,omitempty"
property:"enabled"`
+}
+
+type PushgatewayConfig struct {
+ Enabled *bool `default:"false" yaml:"enabled"
json:"enabled,omitempty" property:"enabled"`
+ BaseUrl string `default:"" yaml:"base-url"
json:"base-url,omitempty" property:"base-url"`
+ Job string `default:"default_dubbo_job" yaml:"job"
json:"job,omitempty" property:"job"`
+ Username string `default:"" yaml:"username"
json:"username,omitempty" property:"username"`
+ Password string `default:"" yaml:"password"
json:"password,omitempty" property:"password"`
+ PushInterval int `default:"30" yaml:"push-interval"
json:"push-interval,omitempty" property:"push-interval"`
}
func (mc *MetricConfig) ToReporterConfig() *metrics.ReporterConfig {
defaultMetricsReportConfig := metrics.NewReporterConfig()
- if mc.Mode == metrics.ReportModePush {
- defaultMetricsReportConfig.Mode = metrics.ReportModePush
- }
- if mc.Namespace != "" {
- defaultMetricsReportConfig.Namespace = mc.Namespace
- }
defaultMetricsReportConfig.Enable = *mc.Enable
defaultMetricsReportConfig.Port = mc.Port
defaultMetricsReportConfig.Path = mc.Path
- defaultMetricsReportConfig.PushGatewayAddress = mc.PushGatewayAddress
- defaultMetricsReportConfig.SummaryMaxAge = mc.SummaryMaxAge
defaultMetricsReportConfig.Protocol = mc.Protocol
return defaultMetricsReportConfig
}
@@ -78,8 +89,6 @@ func (mc *MetricConfig) Init(rc *RootConfig) error {
return err
}
mc.rootConfig = rc
- config := mc.ToReporterConfig()
- extension.GetMetricReporter(mc.Protocol, config)
metrics.Init(mc.toURL())
return nil
}
@@ -98,14 +107,7 @@ func (mcb *MetricConfigBuilder) Build() *MetricConfig {
// DynamicUpdateProperties dynamically update properties.
func (mc *MetricConfig) DynamicUpdateProperties(newMetricConfig *MetricConfig)
{
- if newMetricConfig != nil {
- if newMetricConfig.Enable != mc.Enable {
- mc.Enable = newMetricConfig.Enable
- logger.Infof("MetricConfig's Enable was dynamically
updated, new value:%v", mc.Enable)
-
- extension.GetMetricReporter(mc.Protocol,
mc.ToReporterConfig())
- }
- }
+ // TODO update
}
//
prometheus://localhost:9090?&histogram.enabled=false&prometheus.exporter.enabled=false
@@ -116,5 +118,25 @@ func (mc *MetricConfig) toURL() *common.URL {
url.SetParam(constant.PrometheusExporterMetricsPathKey, mc.Path)
url.SetParam(constant.ApplicationKey, mc.rootConfig.Application.Name)
url.SetParam(constant.AppVersionKey, mc.rootConfig.Application.Version)
+ if mc.Aggregation != nil {
+ url.SetParam(constant.AggregationEnabledKey,
strconv.FormatBool(*mc.Aggregation.Enabled))
+ url.SetParam(constant.AggregationBucketNumKey,
strconv.Itoa(mc.Aggregation.BucketNum))
+ url.SetParam(constant.AggregationTimeWindowSecondsKey,
strconv.Itoa(mc.Aggregation.TimeWindowSeconds))
+ }
+ if mc.Prometheus != nil {
+ if mc.Prometheus.Exporter != nil {
+ exporter := mc.Prometheus.Exporter
+ url.SetParam(constant.PrometheusExporterEnabledKey,
strconv.FormatBool(*exporter.Enabled || *mc.Enable)) // for compatibility
+ }
+ if mc.Prometheus.Pushgateway != nil {
+ pushGateWay := mc.Prometheus.Pushgateway
+ url.SetParam(constant.PrometheusPushgatewayEnabledKey,
strconv.FormatBool(*pushGateWay.Enabled))
+ url.SetParam(constant.PrometheusPushgatewayBaseUrlKey,
pushGateWay.BaseUrl)
+ url.SetParam(constant.PrometheusPushgatewayUsernameKey,
pushGateWay.Username)
+ url.SetParam(constant.PrometheusPushgatewayPasswordKey,
pushGateWay.Password)
+
url.SetParam(constant.PrometheusPushgatewayPushIntervalKey,
strconv.Itoa(pushGateWay.PushInterval))
+ url.SetParam(constant.PrometheusPushgatewayJobKey,
pushGateWay.Job)
+ }
+ }
return url
}
diff --git a/metrics/prometheus/registry.go b/metrics/prometheus/registry.go
index 2a4c71243..f84f2ad55 100644
--- a/metrics/prometheus/registry.go
+++ b/metrics/prometheus/registry.go
@@ -19,11 +19,18 @@ package prometheus
import (
"bytes"
+ "context"
+ "net/http"
"sync"
+ "time"
)
import (
+ "github.com/dubbogo/gost/log/logger"
+
prom "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ "github.com/prometheus/client_golang/prometheus/push"
"github.com/prometheus/common/expfmt"
)
@@ -31,21 +38,28 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metrics"
)
func init() {
metrics.SetRegistry(constant.ProtocolPrometheus, func(url *common.URL)
metrics.MetricRegistry {
- return &promMetricRegistry{r: prom.DefaultRegisterer}
+ return &promMetricRegistry{r: prom.DefaultRegisterer, gather:
prom.DefaultGatherer, url: url}
})
}
type promMetricRegistry struct {
- r prom.Registerer // for convenience of testing
- vecs sync.Map
+ r prom.Registerer
+ gather prom.Gatherer
+ vecs sync.Map
+ url *common.URL
+}
+
+func NewPromMetricRegistry(reg *prom.Registry, url *common.URL)
*promMetricRegistry {
+ return &promMetricRegistry{r: reg, gather: reg, url: url}
}
-func (p *promMetricRegistry) getOrComputeVec(key string, supplier func()
interface{}) interface{} {
+func (p *promMetricRegistry) getOrComputeVec(key string, supplier func()
prom.Collector) interface{} {
v, ok := p.vecs.Load(key)
if !ok {
v, ok = p.vecs.LoadOrStore(key, supplier())
@@ -57,7 +71,7 @@ func (p *promMetricRegistry) getOrComputeVec(key string,
supplier func() interfa
}
func (p *promMetricRegistry) Counter(m *metrics.MetricId)
metrics.CounterMetric {
- vec := p.getOrComputeVec(m.Name, func() interface{} {
+ vec := p.getOrComputeVec(m.Name, func() prom.Collector {
return prom.NewCounterVec(prom.CounterOpts{
Name: m.Name,
Help: m.Desc,
@@ -67,7 +81,7 @@ func (p *promMetricRegistry) Counter(m *metrics.MetricId)
metrics.CounterMetric
}
func (p *promMetricRegistry) Gauge(m *metrics.MetricId) metrics.GaugeMetric {
- vec := p.getOrComputeVec(m.Name, func() interface{} {
+ vec := p.getOrComputeVec(m.Name, func() prom.Collector {
return prom.NewGaugeVec(prom.GaugeOpts{
Name: m.Name,
Help: m.Desc,
@@ -77,7 +91,7 @@ func (p *promMetricRegistry) Gauge(m *metrics.MetricId)
metrics.GaugeMetric {
}
func (p *promMetricRegistry) Histogram(m *metrics.MetricId)
metrics.ObservableMetric {
- vec := p.getOrComputeVec(m.Name, func() interface{} {
+ vec := p.getOrComputeVec(m.Name, func() prom.Collector {
return prom.NewHistogramVec(prom.HistogramOpts{
Name: m.Name,
Help: m.Desc,
@@ -87,7 +101,7 @@ func (p *promMetricRegistry) Histogram(m *metrics.MetricId)
metrics.ObservableMe
}
func (p *promMetricRegistry) Summary(m *metrics.MetricId)
metrics.ObservableMetric {
- vec := p.getOrComputeVec(m.Name, func() interface{} {
+ vec := p.getOrComputeVec(m.Name, func() prom.Collector {
return prom.NewSummaryVec(prom.SummaryOpts{
Name: m.Name,
Help: m.Desc,
@@ -98,11 +112,16 @@ func (p *promMetricRegistry) Summary(m *metrics.MetricId)
metrics.ObservableMetr
func (p *promMetricRegistry) Rt(m *metrics.MetricId, opts *metrics.RtOpts)
metrics.ObservableMetric {
key := m.Name
- var supplier func() interface{}
+ var supplier func() prom.Collector
if opts != nil && opts.Aggregate {
key += "_aggregate"
- supplier = func() interface{} {
- // TODO set default aggregate config from config
+ if opts.BucketNum == 0 {
+ opts.BucketNum =
p.url.GetParamByIntValue(constant.AggregationBucketNumKey,
constant.AggregationDefaultBucketNum)
+ }
+ if opts.TimeWindowSeconds == 0 {
+ opts.TimeWindowSeconds =
p.url.GetParamInt(constant.AggregationTimeWindowSecondsKey,
constant.AggregationDefaultTimeWindowSeconds)
+ }
+ supplier = func() prom.Collector {
return NewAggRtVec(&RtOpts{
Name: m.Name,
Help: m.Desc,
@@ -111,7 +130,7 @@ func (p *promMetricRegistry) Rt(m *metrics.MetricId, opts
*metrics.RtOpts) metri
}, m.TagKeys())
}
} else {
- supplier = func() interface{} {
+ supplier = func() prom.Collector {
return NewRtVec(&RtOpts{
Name: m.Name,
Help: m.Desc,
@@ -123,12 +142,59 @@ func (p *promMetricRegistry) Rt(m *metrics.MetricId, opts
*metrics.RtOpts) metri
}
func (p *promMetricRegistry) Export() {
- // use promauto export global, TODO move here
+ if p.url.GetParamBool(constant.PrometheusExporterEnabledKey, false) {
+ go func() {
+ mux := http.NewServeMux()
+ path :=
p.url.GetParam(constant.PrometheusDefaultMetricsPath,
constant.PrometheusDefaultMetricsPath)
+ port :=
p.url.GetParam(constant.PrometheusExporterMetricsPortKey,
constant.PrometheusDefaultMetricsPort)
+ mux.Handle(path, promhttp.InstrumentMetricHandler(p.r,
promhttp.HandlerFor(p.gather, promhttp.HandlerOpts{})))
+ srv := &http.Server{Addr: ":" + port, Handler: mux}
+ extension.AddCustomShutdownCallback(func() {
+ ctx, cancel :=
context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := srv.Shutdown(ctx); nil != err {
+ logger.Fatalf("prometheus server
shutdown failed, err: %v", err)
+ } else {
+ logger.Info("prometheus server
gracefully shutdown success")
+ }
+ })
+ logger.Infof("prometheus endpoint :%s%s", port, path)
+ if err := srv.ListenAndServe(); err != nil && err !=
http.ErrServerClosed { // except Shutdown or Close
+ logger.Errorf("new prometheus server with error
= %v", err)
+ }
+ }()
+ }
+ if p.url.GetParamBool(constant.PrometheusPushgatewayEnabledKey, false) {
+ baseUrl, exist :=
p.url.GetNonDefaultParam(constant.PrometheusPushgatewayBaseUrlKey)
+ if !exist {
+ logger.Error("no pushgateway url found in config path:
metrics.prometheus.pushgateway.bash-url, please check your config file")
+ return
+ }
+ username :=
p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
+ password :=
p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
+ job := p.url.GetParam(constant.PrometheusPushgatewayJobKey,
constant.PrometheusDefaultJobName)
+ pushInterval :=
p.url.GetParamByIntValue(constant.PrometheusPushgatewayPushIntervalKey,
constant.PrometheusDefaultPushInterval)
+ pusher := push.New(baseUrl, job).Gatherer(p.gather)
+ if len(username) != 0 {
+ pusher.BasicAuth(username, password)
+ }
+ logger.Infof("prometheus pushgateway will push to %s every %d
seconds", baseUrl, pushInterval)
+ ticker := time.NewTicker(time.Duration(pushInterval) *
time.Second)
+ go func() {
+ for range ticker.C {
+ err := pusher.Add()
+ if err != nil {
+ logger.Errorf("push metric data to
prometheus push gateway error", err)
+ } else {
+ logger.Debugf("prometheus pushgateway
push to %s success", baseUrl)
+ }
+ }
+ }()
+ }
}
func (p *promMetricRegistry) Scrape() (string, error) {
- r := p.r.(prom.Gatherer)
- gathering, err := r.Gather()
+ gathering, err := p.gather.Gather()
if err != nil {
return "", err
}
diff --git a/metrics/prometheus/registry_test.go
b/metrics/prometheus/registry_test.go
index 7e7ac2afc..a3e5e6636 100644
--- a/metrics/prometheus/registry_test.go
+++ b/metrics/prometheus/registry_test.go
@@ -18,8 +18,11 @@
package prometheus
import (
+ "io"
+ "net/http"
"sync"
"testing"
+ "time"
)
import (
@@ -29,16 +32,32 @@ import (
)
import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/metrics"
)
var (
tags = map[string]string{"app": "dubbo", "version": "1.0.0"}
metricId = &metrics.MetricId{Name: "dubbo_request", Desc: "request",
Tags: tags}
+ url = common.NewURLWithOptions(
+ common.WithProtocol(constant.ProtocolPrometheus),
+ common.WithParamsValue(constant.PrometheusExporterEnabledKey,
"true"),
+
common.WithParamsValue(constant.PrometheusExporterMetricsPortKey,
constant.PrometheusDefaultMetricsPort),
+
common.WithParamsValue(constant.PrometheusExporterMetricsPathKey,
constant.PrometheusDefaultMetricsPath),
+ common.WithParamsValue(constant.ApplicationKey, "dubbo"),
+ common.WithParamsValue(constant.AppVersionKey, "1.0.0"),
+
common.WithParamsValue(constant.PrometheusPushgatewayEnabledKey, "true"),
+
common.WithParamsValue(constant.PrometheusPushgatewayBaseUrlKey,
"localhost:9091"),
+
common.WithParamsValue(constant.PrometheusPushgatewayUsernameKey, ""),
+
common.WithParamsValue(constant.PrometheusPushgatewayPasswordKey, ""),
+
common.WithParamsValue(constant.PrometheusPushgatewayPushIntervalKey, "2"),
+ common.WithParamsValue(constant.PrometheusPushgatewayJobKey,
"dubbo-push"),
+ )
)
func TestPromMetricRegistryCounter(t *testing.T) {
- p := &promMetricRegistry{r: prom.NewRegistry()}
+ p := NewPromMetricRegistry(prom.NewRegistry(), url)
p.Counter(metricId).Inc()
text, err := p.Scrape()
assert.Nil(t, err)
@@ -47,7 +66,7 @@ func TestPromMetricRegistryCounter(t *testing.T) {
}
func TestPromMetricRegistryGauge(t *testing.T) {
- p := &promMetricRegistry{r: prom.NewRegistry()}
+ p := NewPromMetricRegistry(prom.NewRegistry(), url)
p.Gauge(metricId).Set(100)
text, err := p.Scrape()
assert.Nil(t, err)
@@ -57,7 +76,7 @@ func TestPromMetricRegistryGauge(t *testing.T) {
}
func TestPromMetricRegistryHistogram(t *testing.T) {
- p := &promMetricRegistry{r: prom.NewRegistry()}
+ p := NewPromMetricRegistry(prom.NewRegistry(), url)
p.Histogram(metricId).Observe(100)
text, err := p.Scrape()
assert.Nil(t, err)
@@ -68,7 +87,7 @@ func TestPromMetricRegistryHistogram(t *testing.T) {
}
func TestPromMetricRegistrySummary(t *testing.T) {
- p := &promMetricRegistry{r: prom.NewRegistry()}
+ p := NewPromMetricRegistry(prom.NewRegistry(), url)
p.Summary(metricId).Observe(100)
text, err := p.Scrape()
assert.Nil(t, err)
@@ -78,7 +97,7 @@ func TestPromMetricRegistrySummary(t *testing.T) {
}
func TestPromMetricRegistryRt(t *testing.T) {
- p := &promMetricRegistry{r: prom.NewRegistry()}
+ p := NewPromMetricRegistry(prom.NewRegistry(), url)
for i := 0; i < 10; i++ {
p.Rt(metricId, &metrics.RtOpts{}).Observe(10 * float64(i))
}
@@ -90,7 +109,7 @@ func TestPromMetricRegistryRt(t *testing.T) {
assert.Contains(t, text, "# HELP dubbo_request_min Min request\n# TYPE
dubbo_request_min gauge\ndubbo_request_min{app=\"dubbo\",version=\"1.0.0\"} 0")
assert.Contains(t, text, "# HELP dubbo_request_sum Sum request\n# TYPE
dubbo_request_sum gauge\ndubbo_request_sum{app=\"dubbo\",version=\"1.0.0\"}
450")
- p = &promMetricRegistry{r: prom.NewRegistry()}
+ p = NewPromMetricRegistry(prom.NewRegistry(), url)
for i := 0; i < 10; i++ {
p.Rt(metricId, &metrics.RtOpts{Aggregate: true, BucketNum: 10,
TimeWindowSeconds: 60}).Observe(10 * float64(i))
}
@@ -102,7 +121,7 @@ func TestPromMetricRegistryRt(t *testing.T) {
}
func TestPromMetricRegistryCounterConcurrent(t *testing.T) {
- p := &promMetricRegistry{r: prom.NewRegistry()}
+ p := NewPromMetricRegistry(prom.NewRegistry(), url)
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
@@ -117,3 +136,57 @@ func TestPromMetricRegistryCounterConcurrent(t *testing.T)
{
assert.Contains(t, text, "# HELP dubbo_request request\n# TYPE
dubbo_request counter")
assert.Contains(t, text, `dubbo_request{app="dubbo",version="1.0.0"}
10`)
}
+
+func TestPromMetricRegistryExport(t *testing.T) {
+ p := NewPromMetricRegistry(prom.NewRegistry(), url)
+ go func() {
+ for {
+ p.Rt(metricId, &metrics.RtOpts{}).Observe(10 *
float64(1))
+ time.Sleep(1 * time.Second)
+ }
+ }()
+ p.Export()
+ // test push
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ err :=
http.ListenAndServe(url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, ""),
+ http.HandlerFunc(func(w http.ResponseWriter, r
*http.Request) {
+ bodyBytes, err := io.ReadAll(r.Body)
+ assert.Nil(t, err)
+ text := string(bodyBytes)
+ assert.Contains(t, text, "dubbo_request_avg")
+ wg.Done()
+ }))
+ assert.Nil(t, err)
+ }()
+ timeout :=
url.GetParamByIntValue(constant.PrometheusPushgatewayPushIntervalKey,
constant.PrometheusDefaultPushInterval)
+ if waitTimeout(&wg, time.Duration(timeout+1)*time.Second) {
+ assert.Fail(t, "wait pushgateway data timeout")
+ }
+ // test pull
+ resp, err := http.Get("http://localhost:" +
+ url.GetParam(constant.PrometheusExporterMetricsPortKey,
constant.PrometheusDefaultMetricsPort) +
+ url.GetParam(constant.PrometheusExporterMetricsPathKey,
constant.PrometheusDefaultMetricsPath),
+ )
+ assert.Nil(t, err)
+ defer resp.Body.Close()
+ bodyBytes, err := io.ReadAll(resp.Body)
+ assert.Nil(t, err)
+ text := string(bodyBytes)
+ assert.Contains(t, text, "dubbo_request_avg")
+}
+
+func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
+ c := make(chan struct{})
+ go func() {
+ defer close(c)
+ wg.Wait()
+ }()
+ select {
+ case <-c:
+ return false // completed normally
+ case <-time.After(timeout):
+ return true // timed out
+ }
+}
diff --git a/metrics/prometheus/reporter.go b/metrics/prometheus/reporter.go
deleted file mode 100644
index a5e88eff2..000000000
--- a/metrics/prometheus/reporter.go
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package prometheus
-
-import (
- "context"
- "net/http"
- "sync"
-)
-
-import (
- "github.com/dubbogo/gost/log/logger"
-
- "github.com/prometheus/client_golang/prometheus/promhttp"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/common/extension"
- "dubbo.apache.org/dubbo-go/v3/metrics"
-)
-
-var (
- reporterInstance *reporter
- reporterInitOnce sync.Once
-)
-
-const (
- reporterName = "prometheus"
-)
-
-// should initialize after loading configuration
-func init() {
- // newPrometheusReporter()
- extension.SetMetricReporter(reporterName, newPrometheusReporter)
-}
-
-// reporter will export the metrics to Prometheus
-// if you want to use this feature, you need to initialize your prometheus.
-// https://prometheus.io/docs/guides/go-application/
-type reporter struct {
- reporterServer *http.Server
- reporterConfig *metrics.ReporterConfig
- namespace string
-}
-
-// newPrometheusReporter create a new prometheus server or push gateway
reporter
-func newPrometheusReporter(reporterConfig *metrics.ReporterConfig)
metrics.Reporter {
- if reporterInstance == nil {
- reporterInitOnce.Do(func() {
- reporterInstance = &reporter{
- reporterConfig: reporterConfig,
- namespace: reporterConfig.Namespace,
- }
- })
- }
-
- if reporterConfig.Enable {
- if reporterConfig.Mode == metrics.ReportModePull {
- go reporterInstance.StartServer(reporterConfig)
- }
- // todo pushgateway support
- } else {
- reporterInstance.ShutdownServer()
- }
-
- return reporterInstance
-}
-
-func (r *reporter) StartServer(reporterConfig *metrics.ReporterConfig) {
- // start server
- mux := http.NewServeMux()
- mux.Handle(reporterConfig.Path, promhttp.Handler())
- reporterInstance.reporterServer = &http.Server{Addr: ":" +
reporterConfig.Port, Handler: mux}
- logger.Infof("new prometheus reporter with port = %s, path = %s",
reporterConfig.Port, reporterConfig.Path)
- if err := reporterInstance.reporterServer.ListenAndServe(); err != nil {
- logger.Warnf("new prometheus reporter with error = %s", err)
- }
-}
-
-func (r *reporter) ShutdownServer() {
- if reporterInstance.reporterServer != nil {
- err :=
reporterInstance.reporterServer.Shutdown(context.Background())
- if err != nil {
- logger.Errorf("shutdown prometheus reporter with error
= %s, prometheus reporter close now", err)
- reporterInstance.reporterServer.Close()
- }
- }
-}