This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/main by this push:
     new fb6641dc9 add prometheus pushgateway support (#2415)
fb6641dc9 is described below

commit fb6641dc99c23a10f9dabf4e753397ac889c549b
Author: foghost <[email protected]>
AuthorDate: Mon Sep 4 14:06:07 2023 +0800

    add prometheus pushgateway support (#2415)
---
 common/extension/metrics.go         |  42 ---------------
 common/extension/metrics_test.go    |  49 -----------------
 config/metric_config.go             |  82 +++++++++++++++++-----------
 metrics/prometheus/registry.go      |  96 +++++++++++++++++++++++++++------
 metrics/prometheus/registry_test.go |  87 +++++++++++++++++++++++++++---
 metrics/prometheus/reporter.go      | 103 ------------------------------------
 6 files changed, 213 insertions(+), 246 deletions(-)

diff --git a/common/extension/metrics.go b/common/extension/metrics.go
deleted file mode 100644
index 639d92106..000000000
--- a/common/extension/metrics.go
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package extension
-
-import (
-       "dubbo.apache.org/dubbo-go/v3/metrics"
-)
-
-// we couldn't store the instance because the some instance may initialize 
before loading configuration
-// so lazy initialization will be better.
-var metricReporterMap = make(map[string]func(config *metrics.ReporterConfig) 
metrics.Reporter, 4)
-
-// SetMetricReporter sets a reporter with the @name
-func SetMetricReporter(name string, reporterFunc func(config 
*metrics.ReporterConfig) metrics.Reporter) {
-       metricReporterMap[name] = reporterFunc
-}
-
-// GetMetricReporter finds the reporter with @name.
-// if not found, it will panic.
-// we should know that this method usually is called when system starts, so we 
should panic
-func GetMetricReporter(name string, config *metrics.ReporterConfig) 
metrics.Reporter {
-       reporterFunc, found := metricReporterMap[name]
-       if !found {
-               panic("Cannot find the reporter with name: " + name)
-       }
-       return reporterFunc(config)
-}
diff --git a/common/extension/metrics_test.go b/common/extension/metrics_test.go
deleted file mode 100644
index 64f76422c..000000000
--- a/common/extension/metrics_test.go
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package extension
-
-import (
-       "testing"
-)
-
-import (
-       "github.com/stretchr/testify/assert"
-)
-
-import (
-       "dubbo.apache.org/dubbo-go/v3/metrics"
-)
-
-func TestGetMetricReporter(t *testing.T) {
-       reporter := &mockReporter{}
-       name := "mock"
-       SetMetricReporter(name, func(config *metrics.ReporterConfig) 
metrics.Reporter {
-               return reporter
-       })
-       res := GetMetricReporter(name, metrics.NewReporterConfig())
-       assert.Equal(t, reporter, res)
-}
-
-type mockReporter struct{}
-
-// implement the interface of Reporter
-func (m *mockReporter) StartServer(config *metrics.ReporterConfig) {
-}
-
-func (m *mockReporter) ShutdownServer() {
-}
diff --git a/config/metric_config.go b/config/metric_config.go
index b0e45d06a..0859b57cc 100644
--- a/config/metric_config.go
+++ b/config/metric_config.go
@@ -24,45 +24,56 @@ import (
 import (
        "github.com/creasty/defaults"
 
-       "github.com/dubbogo/gost/log/logger"
-
        "github.com/pkg/errors"
 )
 
 import (
        "dubbo.apache.org/dubbo-go/v3/common"
        "dubbo.apache.org/dubbo-go/v3/common/constant"
-       "dubbo.apache.org/dubbo-go/v3/common/extension"
        "dubbo.apache.org/dubbo-go/v3/metrics"
 )
 
 // MetricConfig This is the config struct for all metrics implementation
 type MetricConfig struct {
-       Mode               string `default:"pull" yaml:"mode" 
json:"mode,omitempty" property:"mode"` // push or pull,
-       Namespace          string `default:"dubbo" yaml:"namespace" 
json:"namespace,omitempty" property:"namespace"`
-       Enable             *bool  `default:"false" yaml:"enable" 
json:"enable,omitempty" property:"enable"`
-       Port               string `default:"9090" yaml:"port" 
json:"port,omitempty" property:"port"`
-       Path               string `default:"/metrics" yaml:"path" 
json:"path,omitempty" property:"path"`
-       PushGatewayAddress string `default:"" yaml:"push-gateway-address" 
json:"push-gateway-address,omitempty" property:"push-gateway-address"`
-       SummaryMaxAge      int64  `default:"600000000000" 
yaml:"summary-max-age" json:"summary-max-age,omitempty" 
property:"summary-max-age"`
-       Protocol           string `default:"prometheus" yaml:"protocol" 
json:"protocol,omitempty" property:"protocol"`
-       rootConfig         *RootConfig
+       Enable      *bool             `default:"false" yaml:"enable" 
json:"enable,omitempty" property:"enable"`
+       Port        string            `default:"9090" yaml:"port" 
json:"port,omitempty" property:"port"`
+       Path        string            `default:"/metrics" yaml:"path" 
json:"path,omitempty" property:"path"`
+       Protocol    string            `default:"prometheus" yaml:"protocol" 
json:"protocol,omitempty" property:"protocol"`
+       Prometheus  *PrometheusConfig `yaml:"prometheus" json:"prometheus" 
property:"prometheus"`
+       Aggregation *AggregateConfig  `yaml:"aggregation" json:"aggregation" 
property:"aggregation"`
+       rootConfig  *RootConfig
+}
+
+type AggregateConfig struct {
+       Enabled           *bool `default:"false" yaml:"enabled" 
json:"enabled,omitempty" property:"enabled"`
+       BucketNum         int   `default:"10" yaml:"bucket-num" 
json:"bucket-num,omitempty" property:"bucket-num"`
+       TimeWindowSeconds int   `default:"120" yaml:"time-window-seconds" 
json:"time-window-seconds,omitempty" property:"time-window-seconds"`
+}
+
+type PrometheusConfig struct {
+       Exporter    *Exporter          `yaml:"exporter" 
json:"exporter,omitempty" property:"exporter"`
+       Pushgateway *PushgatewayConfig `yaml:"pushgateway" 
json:"pushgateway,omitempty" property:"pushgateway"`
+}
+
+type Exporter struct {
+       Enabled *bool `default:"false" yaml:"enabled" json:"enabled,omitempty" 
property:"enabled"`
+}
+
+type PushgatewayConfig struct {
+       Enabled      *bool  `default:"false" yaml:"enabled" 
json:"enabled,omitempty" property:"enabled"`
+       BaseUrl      string `default:"" yaml:"base-url" 
json:"base-url,omitempty" property:"base-url"`
+       Job          string `default:"default_dubbo_job" yaml:"job" 
json:"job,omitempty" property:"job"`
+       Username     string `default:"" yaml:"username" 
json:"username,omitempty" property:"username"`
+       Password     string `default:"" yaml:"password" 
json:"password,omitempty" property:"password"`
+       PushInterval int    `default:"30" yaml:"push-interval" 
json:"push-interval,omitempty" property:"push-interval"`
 }
 
 func (mc *MetricConfig) ToReporterConfig() *metrics.ReporterConfig {
        defaultMetricsReportConfig := metrics.NewReporterConfig()
-       if mc.Mode == metrics.ReportModePush {
-               defaultMetricsReportConfig.Mode = metrics.ReportModePush
-       }
-       if mc.Namespace != "" {
-               defaultMetricsReportConfig.Namespace = mc.Namespace
-       }
 
        defaultMetricsReportConfig.Enable = *mc.Enable
        defaultMetricsReportConfig.Port = mc.Port
        defaultMetricsReportConfig.Path = mc.Path
-       defaultMetricsReportConfig.PushGatewayAddress = mc.PushGatewayAddress
-       defaultMetricsReportConfig.SummaryMaxAge = mc.SummaryMaxAge
        defaultMetricsReportConfig.Protocol = mc.Protocol
        return defaultMetricsReportConfig
 }
@@ -78,8 +89,6 @@ func (mc *MetricConfig) Init(rc *RootConfig) error {
                return err
        }
        mc.rootConfig = rc
-       config := mc.ToReporterConfig()
-       extension.GetMetricReporter(mc.Protocol, config)
        metrics.Init(mc.toURL())
        return nil
 }
@@ -98,14 +107,7 @@ func (mcb *MetricConfigBuilder) Build() *MetricConfig {
 
 // DynamicUpdateProperties dynamically update properties.
 func (mc *MetricConfig) DynamicUpdateProperties(newMetricConfig *MetricConfig) 
{
-       if newMetricConfig != nil {
-               if newMetricConfig.Enable != mc.Enable {
-                       mc.Enable = newMetricConfig.Enable
-                       logger.Infof("MetricConfig's Enable was dynamically 
updated, new value:%v", mc.Enable)
-
-                       extension.GetMetricReporter(mc.Protocol, 
mc.ToReporterConfig())
-               }
-       }
+       // TODO update
 }
 
 // 
prometheus://localhost:9090?&histogram.enabled=false&prometheus.exporter.enabled=false
@@ -116,5 +118,25 @@ func (mc *MetricConfig) toURL() *common.URL {
        url.SetParam(constant.PrometheusExporterMetricsPathKey, mc.Path)
        url.SetParam(constant.ApplicationKey, mc.rootConfig.Application.Name)
        url.SetParam(constant.AppVersionKey, mc.rootConfig.Application.Version)
+       if mc.Aggregation != nil {
+               url.SetParam(constant.AggregationEnabledKey, 
strconv.FormatBool(*mc.Aggregation.Enabled))
+               url.SetParam(constant.AggregationBucketNumKey, 
strconv.Itoa(mc.Aggregation.BucketNum))
+               url.SetParam(constant.AggregationTimeWindowSecondsKey, 
strconv.Itoa(mc.Aggregation.TimeWindowSeconds))
+       }
+       if mc.Prometheus != nil {
+               if mc.Prometheus.Exporter != nil {
+                       exporter := mc.Prometheus.Exporter
+                       url.SetParam(constant.PrometheusExporterEnabledKey, 
strconv.FormatBool(*exporter.Enabled || *mc.Enable)) // for compatibility
+               }
+               if mc.Prometheus.Pushgateway != nil {
+                       pushGateWay := mc.Prometheus.Pushgateway
+                       url.SetParam(constant.PrometheusPushgatewayEnabledKey, 
strconv.FormatBool(*pushGateWay.Enabled))
+                       url.SetParam(constant.PrometheusPushgatewayBaseUrlKey, 
pushGateWay.BaseUrl)
+                       url.SetParam(constant.PrometheusPushgatewayUsernameKey, 
pushGateWay.Username)
+                       url.SetParam(constant.PrometheusPushgatewayPasswordKey, 
pushGateWay.Password)
+                       
url.SetParam(constant.PrometheusPushgatewayPushIntervalKey, 
strconv.Itoa(pushGateWay.PushInterval))
+                       url.SetParam(constant.PrometheusPushgatewayJobKey, 
pushGateWay.Job)
+               }
+       }
        return url
 }
diff --git a/metrics/prometheus/registry.go b/metrics/prometheus/registry.go
index 2a4c71243..f84f2ad55 100644
--- a/metrics/prometheus/registry.go
+++ b/metrics/prometheus/registry.go
@@ -19,11 +19,18 @@ package prometheus
 
 import (
        "bytes"
+       "context"
+       "net/http"
        "sync"
+       "time"
 )
 
 import (
+       "github.com/dubbogo/gost/log/logger"
+
        prom "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
+       "github.com/prometheus/client_golang/prometheus/push"
 
        "github.com/prometheus/common/expfmt"
 )
@@ -31,21 +38,28 @@ import (
 import (
        "dubbo.apache.org/dubbo-go/v3/common"
        "dubbo.apache.org/dubbo-go/v3/common/constant"
+       "dubbo.apache.org/dubbo-go/v3/common/extension"
        "dubbo.apache.org/dubbo-go/v3/metrics"
 )
 
 func init() {
        metrics.SetRegistry(constant.ProtocolPrometheus, func(url *common.URL) 
metrics.MetricRegistry {
-               return &promMetricRegistry{r: prom.DefaultRegisterer}
+               return &promMetricRegistry{r: prom.DefaultRegisterer, gather: 
prom.DefaultGatherer, url: url}
        })
 }
 
 type promMetricRegistry struct {
-       r    prom.Registerer // for convenience of testing
-       vecs sync.Map
+       r      prom.Registerer
+       gather prom.Gatherer
+       vecs   sync.Map
+       url    *common.URL
+}
+
+func NewPromMetricRegistry(reg *prom.Registry, url *common.URL) 
*promMetricRegistry {
+       return &promMetricRegistry{r: reg, gather: reg, url: url}
 }
 
-func (p *promMetricRegistry) getOrComputeVec(key string, supplier func() 
interface{}) interface{} {
+func (p *promMetricRegistry) getOrComputeVec(key string, supplier func() 
prom.Collector) interface{} {
        v, ok := p.vecs.Load(key)
        if !ok {
                v, ok = p.vecs.LoadOrStore(key, supplier())
@@ -57,7 +71,7 @@ func (p *promMetricRegistry) getOrComputeVec(key string, 
supplier func() interfa
 }
 
 func (p *promMetricRegistry) Counter(m *metrics.MetricId) 
metrics.CounterMetric {
-       vec := p.getOrComputeVec(m.Name, func() interface{} {
+       vec := p.getOrComputeVec(m.Name, func() prom.Collector {
                return prom.NewCounterVec(prom.CounterOpts{
                        Name: m.Name,
                        Help: m.Desc,
@@ -67,7 +81,7 @@ func (p *promMetricRegistry) Counter(m *metrics.MetricId) 
metrics.CounterMetric
 }
 
 func (p *promMetricRegistry) Gauge(m *metrics.MetricId) metrics.GaugeMetric {
-       vec := p.getOrComputeVec(m.Name, func() interface{} {
+       vec := p.getOrComputeVec(m.Name, func() prom.Collector {
                return prom.NewGaugeVec(prom.GaugeOpts{
                        Name: m.Name,
                        Help: m.Desc,
@@ -77,7 +91,7 @@ func (p *promMetricRegistry) Gauge(m *metrics.MetricId) 
metrics.GaugeMetric {
 }
 
 func (p *promMetricRegistry) Histogram(m *metrics.MetricId) 
metrics.ObservableMetric {
-       vec := p.getOrComputeVec(m.Name, func() interface{} {
+       vec := p.getOrComputeVec(m.Name, func() prom.Collector {
                return prom.NewHistogramVec(prom.HistogramOpts{
                        Name: m.Name,
                        Help: m.Desc,
@@ -87,7 +101,7 @@ func (p *promMetricRegistry) Histogram(m *metrics.MetricId) 
metrics.ObservableMe
 }
 
 func (p *promMetricRegistry) Summary(m *metrics.MetricId) 
metrics.ObservableMetric {
-       vec := p.getOrComputeVec(m.Name, func() interface{} {
+       vec := p.getOrComputeVec(m.Name, func() prom.Collector {
                return prom.NewSummaryVec(prom.SummaryOpts{
                        Name: m.Name,
                        Help: m.Desc,
@@ -98,11 +112,16 @@ func (p *promMetricRegistry) Summary(m *metrics.MetricId) 
metrics.ObservableMetr
 
 func (p *promMetricRegistry) Rt(m *metrics.MetricId, opts *metrics.RtOpts) 
metrics.ObservableMetric {
        key := m.Name
-       var supplier func() interface{}
+       var supplier func() prom.Collector
        if opts != nil && opts.Aggregate {
                key += "_aggregate"
-               supplier = func() interface{} {
-                       // TODO set default aggregate config from config
+               if opts.BucketNum == 0 {
+                       opts.BucketNum = 
p.url.GetParamByIntValue(constant.AggregationBucketNumKey, 
constant.AggregationDefaultBucketNum)
+               }
+               if opts.TimeWindowSeconds == 0 {
+                       opts.TimeWindowSeconds = 
p.url.GetParamInt(constant.AggregationTimeWindowSecondsKey, 
constant.AggregationDefaultTimeWindowSeconds)
+               }
+               supplier = func() prom.Collector {
                        return NewAggRtVec(&RtOpts{
                                Name:              m.Name,
                                Help:              m.Desc,
@@ -111,7 +130,7 @@ func (p *promMetricRegistry) Rt(m *metrics.MetricId, opts 
*metrics.RtOpts) metri
                        }, m.TagKeys())
                }
        } else {
-               supplier = func() interface{} {
+               supplier = func() prom.Collector {
                        return NewRtVec(&RtOpts{
                                Name: m.Name,
                                Help: m.Desc,
@@ -123,12 +142,59 @@ func (p *promMetricRegistry) Rt(m *metrics.MetricId, opts 
*metrics.RtOpts) metri
 }
 
 func (p *promMetricRegistry) Export() {
-       // use promauto export global, TODO move here
+       if p.url.GetParamBool(constant.PrometheusExporterEnabledKey, false) {
+               go func() {
+                       mux := http.NewServeMux()
+                       path := 
p.url.GetParam(constant.PrometheusDefaultMetricsPath, 
constant.PrometheusDefaultMetricsPath)
+                       port := 
p.url.GetParam(constant.PrometheusExporterMetricsPortKey, 
constant.PrometheusDefaultMetricsPort)
+                       mux.Handle(path, promhttp.InstrumentMetricHandler(p.r, 
promhttp.HandlerFor(p.gather, promhttp.HandlerOpts{})))
+                       srv := &http.Server{Addr: ":" + port, Handler: mux}
+                       extension.AddCustomShutdownCallback(func() {
+                               ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
+                               defer cancel()
+                               if err := srv.Shutdown(ctx); nil != err {
+                                       logger.Fatalf("prometheus server 
shutdown failed, err: %v", err)
+                               } else {
+                                       logger.Info("prometheus server 
gracefully shutdown success")
+                               }
+                       })
+                       logger.Infof("prometheus endpoint :%s%s", port, path)
+                       if err := srv.ListenAndServe(); err != nil && err != 
http.ErrServerClosed { // except Shutdown or Close
+                               logger.Errorf("new prometheus server with error 
= %v", err)
+                       }
+               }()
+       }
+       if p.url.GetParamBool(constant.PrometheusPushgatewayEnabledKey, false) {
+               baseUrl, exist := 
p.url.GetNonDefaultParam(constant.PrometheusPushgatewayBaseUrlKey)
+               if !exist {
+                       logger.Error("no pushgateway url found in config path: 
metrics.prometheus.pushgateway.bash-url, please check your config file")
+                       return
+               }
+               username := 
p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
+               password := 
p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
+               job := p.url.GetParam(constant.PrometheusPushgatewayJobKey, 
constant.PrometheusDefaultJobName)
+               pushInterval := 
p.url.GetParamByIntValue(constant.PrometheusPushgatewayPushIntervalKey, 
constant.PrometheusDefaultPushInterval)
+               pusher := push.New(baseUrl, job).Gatherer(p.gather)
+               if len(username) != 0 {
+                       pusher.BasicAuth(username, password)
+               }
+               logger.Infof("prometheus pushgateway will push to %s every %d 
seconds", baseUrl, pushInterval)
+               ticker := time.NewTicker(time.Duration(pushInterval) * 
time.Second)
+               go func() {
+                       for range ticker.C {
+                               err := pusher.Add()
+                               if err != nil {
+                                       logger.Errorf("push metric data to 
prometheus push gateway error", err)
+                               } else {
+                                       logger.Debugf("prometheus pushgateway 
push to %s success", baseUrl)
+                               }
+                       }
+               }()
+       }
 }
 
 func (p *promMetricRegistry) Scrape() (string, error) {
-       r := p.r.(prom.Gatherer)
-       gathering, err := r.Gather()
+       gathering, err := p.gather.Gather()
        if err != nil {
                return "", err
        }
diff --git a/metrics/prometheus/registry_test.go 
b/metrics/prometheus/registry_test.go
index 7e7ac2afc..a3e5e6636 100644
--- a/metrics/prometheus/registry_test.go
+++ b/metrics/prometheus/registry_test.go
@@ -18,8 +18,11 @@
 package prometheus
 
 import (
+       "io"
+       "net/http"
        "sync"
        "testing"
+       "time"
 )
 
 import (
@@ -29,16 +32,32 @@ import (
 )
 
 import (
+       "dubbo.apache.org/dubbo-go/v3/common"
+       "dubbo.apache.org/dubbo-go/v3/common/constant"
        "dubbo.apache.org/dubbo-go/v3/metrics"
 )
 
 var (
        tags     = map[string]string{"app": "dubbo", "version": "1.0.0"}
        metricId = &metrics.MetricId{Name: "dubbo_request", Desc: "request", 
Tags: tags}
+       url      = common.NewURLWithOptions(
+               common.WithProtocol(constant.ProtocolPrometheus),
+               common.WithParamsValue(constant.PrometheusExporterEnabledKey, 
"true"),
+               
common.WithParamsValue(constant.PrometheusExporterMetricsPortKey, 
constant.PrometheusDefaultMetricsPort),
+               
common.WithParamsValue(constant.PrometheusExporterMetricsPathKey, 
constant.PrometheusDefaultMetricsPath),
+               common.WithParamsValue(constant.ApplicationKey, "dubbo"),
+               common.WithParamsValue(constant.AppVersionKey, "1.0.0"),
+               
common.WithParamsValue(constant.PrometheusPushgatewayEnabledKey, "true"),
+               
common.WithParamsValue(constant.PrometheusPushgatewayBaseUrlKey, 
"localhost:9091"),
+               
common.WithParamsValue(constant.PrometheusPushgatewayUsernameKey, ""),
+               
common.WithParamsValue(constant.PrometheusPushgatewayPasswordKey, ""),
+               
common.WithParamsValue(constant.PrometheusPushgatewayPushIntervalKey, "2"),
+               common.WithParamsValue(constant.PrometheusPushgatewayJobKey, 
"dubbo-push"),
+       )
 )
 
 func TestPromMetricRegistryCounter(t *testing.T) {
-       p := &promMetricRegistry{r: prom.NewRegistry()}
+       p := NewPromMetricRegistry(prom.NewRegistry(), url)
        p.Counter(metricId).Inc()
        text, err := p.Scrape()
        assert.Nil(t, err)
@@ -47,7 +66,7 @@ func TestPromMetricRegistryCounter(t *testing.T) {
 }
 
 func TestPromMetricRegistryGauge(t *testing.T) {
-       p := &promMetricRegistry{r: prom.NewRegistry()}
+       p := NewPromMetricRegistry(prom.NewRegistry(), url)
        p.Gauge(metricId).Set(100)
        text, err := p.Scrape()
        assert.Nil(t, err)
@@ -57,7 +76,7 @@ func TestPromMetricRegistryGauge(t *testing.T) {
 }
 
 func TestPromMetricRegistryHistogram(t *testing.T) {
-       p := &promMetricRegistry{r: prom.NewRegistry()}
+       p := NewPromMetricRegistry(prom.NewRegistry(), url)
        p.Histogram(metricId).Observe(100)
        text, err := p.Scrape()
        assert.Nil(t, err)
@@ -68,7 +87,7 @@ func TestPromMetricRegistryHistogram(t *testing.T) {
 }
 
 func TestPromMetricRegistrySummary(t *testing.T) {
-       p := &promMetricRegistry{r: prom.NewRegistry()}
+       p := NewPromMetricRegistry(prom.NewRegistry(), url)
        p.Summary(metricId).Observe(100)
        text, err := p.Scrape()
        assert.Nil(t, err)
@@ -78,7 +97,7 @@ func TestPromMetricRegistrySummary(t *testing.T) {
 }
 
 func TestPromMetricRegistryRt(t *testing.T) {
-       p := &promMetricRegistry{r: prom.NewRegistry()}
+       p := NewPromMetricRegistry(prom.NewRegistry(), url)
        for i := 0; i < 10; i++ {
                p.Rt(metricId, &metrics.RtOpts{}).Observe(10 * float64(i))
        }
@@ -90,7 +109,7 @@ func TestPromMetricRegistryRt(t *testing.T) {
        assert.Contains(t, text, "# HELP dubbo_request_min Min request\n# TYPE 
dubbo_request_min gauge\ndubbo_request_min{app=\"dubbo\",version=\"1.0.0\"} 0")
        assert.Contains(t, text, "# HELP dubbo_request_sum Sum request\n# TYPE 
dubbo_request_sum gauge\ndubbo_request_sum{app=\"dubbo\",version=\"1.0.0\"} 
450")
 
-       p = &promMetricRegistry{r: prom.NewRegistry()}
+       p = NewPromMetricRegistry(prom.NewRegistry(), url)
        for i := 0; i < 10; i++ {
                p.Rt(metricId, &metrics.RtOpts{Aggregate: true, BucketNum: 10, 
TimeWindowSeconds: 60}).Observe(10 * float64(i))
        }
@@ -102,7 +121,7 @@ func TestPromMetricRegistryRt(t *testing.T) {
 }
 
 func TestPromMetricRegistryCounterConcurrent(t *testing.T) {
-       p := &promMetricRegistry{r: prom.NewRegistry()}
+       p := NewPromMetricRegistry(prom.NewRegistry(), url)
        var wg sync.WaitGroup
        for i := 0; i < 10; i++ {
                wg.Add(1)
@@ -117,3 +136,57 @@ func TestPromMetricRegistryCounterConcurrent(t *testing.T) 
{
        assert.Contains(t, text, "# HELP dubbo_request request\n# TYPE 
dubbo_request counter")
        assert.Contains(t, text, `dubbo_request{app="dubbo",version="1.0.0"} 
10`)
 }
+
+func TestPromMetricRegistryExport(t *testing.T) {
+       p := NewPromMetricRegistry(prom.NewRegistry(), url)
+       go func() {
+               for {
+                       p.Rt(metricId, &metrics.RtOpts{}).Observe(10 * 
float64(1))
+                       time.Sleep(1 * time.Second)
+               }
+       }()
+       p.Export()
+       // test push
+       var wg sync.WaitGroup
+       wg.Add(1)
+       go func() {
+               err := 
http.ListenAndServe(url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, ""),
+                       http.HandlerFunc(func(w http.ResponseWriter, r 
*http.Request) {
+                               bodyBytes, err := io.ReadAll(r.Body)
+                               assert.Nil(t, err)
+                               text := string(bodyBytes)
+                               assert.Contains(t, text, "dubbo_request_avg")
+                               wg.Done()
+                       }))
+               assert.Nil(t, err)
+       }()
+       timeout := 
url.GetParamByIntValue(constant.PrometheusPushgatewayPushIntervalKey, 
constant.PrometheusDefaultPushInterval)
+       if waitTimeout(&wg, time.Duration(timeout+1)*time.Second) {
+               assert.Fail(t, "wait pushgateway data timeout")
+       }
+       // test pull
+       resp, err := http.Get("http://localhost:"; +
+               url.GetParam(constant.PrometheusExporterMetricsPortKey, 
constant.PrometheusDefaultMetricsPort) +
+               url.GetParam(constant.PrometheusExporterMetricsPathKey, 
constant.PrometheusDefaultMetricsPath),
+       )
+       assert.Nil(t, err)
+       defer resp.Body.Close()
+       bodyBytes, err := io.ReadAll(resp.Body)
+       assert.Nil(t, err)
+       text := string(bodyBytes)
+       assert.Contains(t, text, "dubbo_request_avg")
+}
+
+func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
+       c := make(chan struct{})
+       go func() {
+               defer close(c)
+               wg.Wait()
+       }()
+       select {
+       case <-c:
+               return false // completed normally
+       case <-time.After(timeout):
+               return true // timed out
+       }
+}
diff --git a/metrics/prometheus/reporter.go b/metrics/prometheus/reporter.go
deleted file mode 100644
index a5e88eff2..000000000
--- a/metrics/prometheus/reporter.go
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package prometheus
-
-import (
-       "context"
-       "net/http"
-       "sync"
-)
-
-import (
-       "github.com/dubbogo/gost/log/logger"
-
-       "github.com/prometheus/client_golang/prometheus/promhttp"
-)
-
-import (
-       "dubbo.apache.org/dubbo-go/v3/common/extension"
-       "dubbo.apache.org/dubbo-go/v3/metrics"
-)
-
-var (
-       reporterInstance *reporter
-       reporterInitOnce sync.Once
-)
-
-const (
-       reporterName = "prometheus"
-)
-
-// should initialize after loading configuration
-func init() {
-       // newPrometheusReporter()
-       extension.SetMetricReporter(reporterName, newPrometheusReporter)
-}
-
-// reporter will export the metrics to Prometheus
-// if you want to use this feature, you need to initialize your prometheus.
-// https://prometheus.io/docs/guides/go-application/
-type reporter struct {
-       reporterServer *http.Server
-       reporterConfig *metrics.ReporterConfig
-       namespace      string
-}
-
-// newPrometheusReporter create a new prometheus server or push gateway 
reporter
-func newPrometheusReporter(reporterConfig *metrics.ReporterConfig) 
metrics.Reporter {
-       if reporterInstance == nil {
-               reporterInitOnce.Do(func() {
-                       reporterInstance = &reporter{
-                               reporterConfig: reporterConfig,
-                               namespace:      reporterConfig.Namespace,
-                       }
-               })
-       }
-
-       if reporterConfig.Enable {
-               if reporterConfig.Mode == metrics.ReportModePull {
-                       go reporterInstance.StartServer(reporterConfig)
-               }
-               // todo pushgateway support
-       } else {
-               reporterInstance.ShutdownServer()
-       }
-
-       return reporterInstance
-}
-
-func (r *reporter) StartServer(reporterConfig *metrics.ReporterConfig) {
-       // start server
-       mux := http.NewServeMux()
-       mux.Handle(reporterConfig.Path, promhttp.Handler())
-       reporterInstance.reporterServer = &http.Server{Addr: ":" + 
reporterConfig.Port, Handler: mux}
-       logger.Infof("new prometheus reporter with port = %s, path = %s", 
reporterConfig.Port, reporterConfig.Path)
-       if err := reporterInstance.reporterServer.ListenAndServe(); err != nil {
-               logger.Warnf("new prometheus reporter with error = %s", err)
-       }
-}
-
-func (r *reporter) ShutdownServer() {
-       if reporterInstance.reporterServer != nil {
-               err := 
reporterInstance.reporterServer.Shutdown(context.Background())
-               if err != nil {
-                       logger.Errorf("shutdown prometheus reporter with error 
= %s, prometheus reporter close now", err)
-                       reporterInstance.reporterServer.Close()
-               }
-       }
-}

Reply via email to