This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 40dd19851 add switch for metric collector (#2424)
40dd19851 is described below

commit 40dd198515bfcf7d50b5fdb08fe138f1e2957754
Author: foghost <[email protected]>
AuthorDate: Tue Oct 24 11:30:02 2023 +0800

    add switch for metric collector (#2424)
---
 common/constant/key.go             |  4 ++
 config/metric_config.go            | 44 ++++++++++++++----
 config/metric_config_test.go       | 18 ++++++--
 metrics/config_center/collector.go |  8 ++--
 metrics/metadata/collector.go      |  8 ++--
 metrics/prometheus/registry.go     | 94 ++++++++++++++++++++------------------
 metrics/registry/collector.go      |  8 ++--
 metrics/rpc/collector.go           | 12 +++--
 8 files changed, 125 insertions(+), 71 deletions(-)

diff --git a/common/constant/key.go b/common/constant/key.go
index 4d0b0a8a3..c85384d43 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -404,6 +404,10 @@ const (
 
 // metrics key
 const (
+       MetadataEnabledKey                   = "metrics.metadata.enabled"
+       RegistryEnabledKey                   = "metrics.registry.enabled"
+       ConfigCenterEnabledKey               = "metrics.config-center.enabled"
+       RpcEnabledKey                        = "metrics.rpc.enabled"
        AggregationEnabledKey                = "aggregation.enabled"
        AggregationBucketNumKey              = "aggregation.bucket.num"
        AggregationTimeWindowSecondsKey      = "aggregation.time.window.seconds"
diff --git a/config/metric_config.go b/config/metric_config.go
index 0859b57cc..af41eb853 100644
--- a/config/metric_config.go
+++ b/config/metric_config.go
@@ -35,13 +35,17 @@ import (
 
 // MetricConfig This is the config struct for all metrics implementation
 type MetricConfig struct {
-       Enable      *bool             `default:"false" yaml:"enable" 
json:"enable,omitempty" property:"enable"`
-       Port        string            `default:"9090" yaml:"port" 
json:"port,omitempty" property:"port"`
-       Path        string            `default:"/metrics" yaml:"path" 
json:"path,omitempty" property:"path"`
-       Protocol    string            `default:"prometheus" yaml:"protocol" 
json:"protocol,omitempty" property:"protocol"`
-       Prometheus  *PrometheusConfig `yaml:"prometheus" json:"prometheus" 
property:"prometheus"`
-       Aggregation *AggregateConfig  `yaml:"aggregation" json:"aggregation" 
property:"aggregation"`
-       rootConfig  *RootConfig
+       Enable             *bool             `default:"false" yaml:"enable" 
json:"enable,omitempty" property:"enable"`
+       Port               string            `default:"9090" yaml:"port" 
json:"port,omitempty" property:"port"`
+       Path               string            `default:"/metrics" yaml:"path" 
json:"path,omitempty" property:"path"`
+       Protocol           string            `default:"prometheus" 
yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
+       EnableMetadata     *bool             `default:"true" 
yaml:"enable-metadata" json:"enable-metadata,omitempty" 
property:"enable-metadata"`
+       EnableRegistry     *bool             `default:"true" 
yaml:"enable-registry" json:"enable-registry,omitempty" 
property:"enable-registry"`
+       EnableConfigCenter *bool             `default:"true" 
yaml:"enable-config-center" json:"enable-config-center,omitempty" 
property:"enable-config-center"`
+       EnableRpc          *bool             `default:"true" yaml:"enable-rpc" 
json:"enable-rpc,omitempty" property:"enable-rpc"`
+       Prometheus         *PrometheusConfig `yaml:"prometheus" 
json:"prometheus" property:"prometheus"`
+       Aggregation        *AggregateConfig  `yaml:"aggregation" 
json:"aggregation" property:"aggregation"`
+       rootConfig         *RootConfig
 }
 
 type AggregateConfig struct {
@@ -101,6 +105,26 @@ func NewMetricConfigBuilder() *MetricConfigBuilder {
        return &MetricConfigBuilder{metricConfig: &MetricConfig{}}
 }
 
+func (mcb *MetricConfigBuilder) SetMetadataEnabled(enabled bool) 
*MetricConfigBuilder {
+       mcb.metricConfig.EnableMetadata = &enabled
+       return mcb
+}
+
+func (mcb *MetricConfigBuilder) SetRegistryEnabled(enabled bool) 
*MetricConfigBuilder {
+       mcb.metricConfig.EnableRegistry = &enabled
+       return mcb
+}
+
+func (mcb *MetricConfigBuilder) SetConfigCenterEnabled(enabled bool) 
*MetricConfigBuilder {
+       mcb.metricConfig.EnableConfigCenter = &enabled
+       return mcb
+}
+
+func (mcb *MetricConfigBuilder) SetRpcEnabled(enabled bool) 
*MetricConfigBuilder {
+       mcb.metricConfig.EnableRpc = &enabled
+       return mcb
+}
+
 func (mcb *MetricConfigBuilder) Build() *MetricConfig {
        return mcb.metricConfig
 }
@@ -113,11 +137,15 @@ func (mc *MetricConfig) 
DynamicUpdateProperties(newMetricConfig *MetricConfig) {
 // 
prometheus://localhost:9090?&histogram.enabled=false&prometheus.exporter.enabled=false
 func (mc *MetricConfig) toURL() *common.URL {
        url, _ := common.NewURL("localhost", common.WithProtocol(mc.Protocol))
-       url.SetParam(constant.PrometheusExporterEnabledKey, 
strconv.FormatBool(*mc.Enable))
+       url.SetParam(constant.PrometheusExporterEnabledKey, 
strconv.FormatBool(*mc.Enable)) // for compatibility
        url.SetParam(constant.PrometheusExporterMetricsPortKey, mc.Port)
        url.SetParam(constant.PrometheusExporterMetricsPathKey, mc.Path)
        url.SetParam(constant.ApplicationKey, mc.rootConfig.Application.Name)
        url.SetParam(constant.AppVersionKey, mc.rootConfig.Application.Version)
+       url.SetParam(constant.MetadataEnabledKey, 
strconv.FormatBool(*mc.EnableMetadata))
+       url.SetParam(constant.RegistryEnabledKey, 
strconv.FormatBool(*mc.EnableRegistry))
+       url.SetParam(constant.ConfigCenterEnabledKey, 
strconv.FormatBool(*mc.EnableConfigCenter))
+       url.SetParam(constant.RpcEnabledKey, strconv.FormatBool(*mc.EnableRpc))
        if mc.Aggregation != nil {
                url.SetParam(constant.AggregationEnabledKey, 
strconv.FormatBool(*mc.Aggregation.Enabled))
                url.SetParam(constant.AggregationBucketNumKey, 
strconv.Itoa(mc.Aggregation.BucketNum))
diff --git a/config/metric_config_test.go b/config/metric_config_test.go
index 70dce11b1..31a0ac6b7 100644
--- a/config/metric_config_test.go
+++ b/config/metric_config_test.go
@@ -26,9 +26,17 @@ import (
 )
 
 func TestMetricConfigBuilder(t *testing.T) {
-       config := NewMetricConfigBuilder().Build()
-       err := config.Init(&RootConfig{Application: &ApplicationConfig{Name: 
"dubbo", Version: "1.0.0"}})
-       assert.NoError(t, err)
-       reporterConfig := config.ToReporterConfig()
-       assert.Equal(t, string(reporterConfig.Mode), "pull")
+       config := NewMetricConfigBuilder().
+               SetConfigCenterEnabled(false).
+               SetMetadataEnabled(false).
+               SetRegistryEnabled(false).
+               SetRpcEnabled(false).
+               Build()
+       enable := false
+       assert.Equal(t, &MetricConfig{
+               EnableConfigCenter: &enable,
+               EnableMetadata:     &enable,
+               EnableRegistry:     &enable,
+               EnableRpc:          &enable,
+       }, config)
 }
diff --git a/metrics/config_center/collector.go 
b/metrics/config_center/collector.go
index 9ae551f0e..3b4bb1e49 100644
--- a/metrics/config_center/collector.go
+++ b/metrics/config_center/collector.go
@@ -30,9 +30,11 @@ var ch = make(chan metrics.MetricsEvent, 10)
 var info = metrics.NewMetricKey("dubbo_configcenter_total", "Config Changed 
Total")
 
 func init() {
-       metrics.AddCollector("config_center", func(mr metrics.MetricRegistry, _ 
*common.URL) {
-               c := &configCenterCollector{r: mr}
-               c.start()
+       metrics.AddCollector("config_center", func(mr metrics.MetricRegistry, 
url *common.URL) {
+               if url.GetParamBool(constant.ConfigCenterEnabledKey, true) {
+                       c := &configCenterCollector{r: mr}
+                       c.start()
+               }
        })
 }
 
diff --git a/metrics/metadata/collector.go b/metrics/metadata/collector.go
index 7125fb1f1..8a08e0ff4 100644
--- a/metrics/metadata/collector.go
+++ b/metrics/metadata/collector.go
@@ -32,9 +32,11 @@ const eventType = constant.MetricsMetadata
 var ch = make(chan metrics.MetricsEvent, 10)
 
 func init() {
-       metrics.AddCollector("metadata", func(mr metrics.MetricRegistry, _ 
*common.URL) {
-               l := &MetadataMetricCollector{metrics.BaseCollector{R: mr}}
-               l.start()
+       metrics.AddCollector("metadata", func(mr metrics.MetricRegistry, url 
*common.URL) {
+               if url.GetParamBool(constant.MetadataEnabledKey, true) {
+                       l := &MetadataMetricCollector{metrics.BaseCollector{R: 
mr}}
+                       l.start()
+               }
        })
 }
 
diff --git a/metrics/prometheus/registry.go b/metrics/prometheus/registry.go
index f84f2ad55..d108f8f9f 100644
--- a/metrics/prometheus/registry.go
+++ b/metrics/prometheus/registry.go
@@ -143,56 +143,62 @@ func (p *promMetricRegistry) Rt(m *metrics.MetricId, opts 
*metrics.RtOpts) metri
 
 func (p *promMetricRegistry) Export() {
        if p.url.GetParamBool(constant.PrometheusExporterEnabledKey, false) {
-               go func() {
-                       mux := http.NewServeMux()
-                       path := 
p.url.GetParam(constant.PrometheusDefaultMetricsPath, 
constant.PrometheusDefaultMetricsPath)
-                       port := 
p.url.GetParam(constant.PrometheusExporterMetricsPortKey, 
constant.PrometheusDefaultMetricsPort)
-                       mux.Handle(path, promhttp.InstrumentMetricHandler(p.r, 
promhttp.HandlerFor(p.gather, promhttp.HandlerOpts{})))
-                       srv := &http.Server{Addr: ":" + port, Handler: mux}
-                       extension.AddCustomShutdownCallback(func() {
-                               ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
-                               defer cancel()
-                               if err := srv.Shutdown(ctx); nil != err {
-                                       logger.Fatalf("prometheus server 
shutdown failed, err: %v", err)
-                               } else {
-                                       logger.Info("prometheus server 
gracefully shutdown success")
-                               }
-                       })
-                       logger.Infof("prometheus endpoint :%s%s", port, path)
-                       if err := srv.ListenAndServe(); err != nil && err != 
http.ErrServerClosed { // except Shutdown or Close
-                               logger.Errorf("new prometheus server with error 
= %v", err)
-                       }
-               }()
+               go p.exportHttp()
        }
        if p.url.GetParamBool(constant.PrometheusPushgatewayEnabledKey, false) {
-               baseUrl, exist := 
p.url.GetNonDefaultParam(constant.PrometheusPushgatewayBaseUrlKey)
-               if !exist {
-                       logger.Error("no pushgateway url found in config path: 
metrics.prometheus.pushgateway.bash-url, please check your config file")
-                       return
-               }
-               username := 
p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
-               password := 
p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
-               job := p.url.GetParam(constant.PrometheusPushgatewayJobKey, 
constant.PrometheusDefaultJobName)
-               pushInterval := 
p.url.GetParamByIntValue(constant.PrometheusPushgatewayPushIntervalKey, 
constant.PrometheusDefaultPushInterval)
-               pusher := push.New(baseUrl, job).Gatherer(p.gather)
-               if len(username) != 0 {
-                       pusher.BasicAuth(username, password)
+               p.exportPushgateway()
+       }
+}
+
+func (p *promMetricRegistry) exportHttp() {
+       mux := http.NewServeMux()
+       path := p.url.GetParam(constant.PrometheusDefaultMetricsPath, 
constant.PrometheusDefaultMetricsPath)
+       port := p.url.GetParam(constant.PrometheusExporterMetricsPortKey, 
constant.PrometheusDefaultMetricsPort)
+       mux.Handle(path, promhttp.InstrumentMetricHandler(p.r, 
promhttp.HandlerFor(p.gather, promhttp.HandlerOpts{})))
+       srv := &http.Server{Addr: ":" + port, Handler: mux}
+       extension.AddCustomShutdownCallback(func() {
+               ctx, cancel := context.WithTimeout(context.Background(), 
5*time.Second)
+               defer cancel()
+               if err := srv.Shutdown(ctx); nil != err {
+                       logger.Fatalf("prometheus server shutdown failed, err: 
%v", err)
+               } else {
+                       logger.Info("prometheus server gracefully shutdown 
success")
                }
-               logger.Infof("prometheus pushgateway will push to %s every %d 
seconds", baseUrl, pushInterval)
-               ticker := time.NewTicker(time.Duration(pushInterval) * 
time.Second)
-               go func() {
-                       for range ticker.C {
-                               err := pusher.Add()
-                               if err != nil {
-                                       logger.Errorf("push metric data to 
prometheus push gateway error", err)
-                               } else {
-                                       logger.Debugf("prometheus pushgateway 
push to %s success", baseUrl)
-                               }
-                       }
-               }()
+       })
+       logger.Infof("prometheus endpoint :%s%s", port, path)
+       if err := srv.ListenAndServe(); err != nil && err != 
http.ErrServerClosed { // except Shutdown or Close
+               logger.Errorf("new prometheus server with error: %v", err)
        }
 }
 
+func (p *promMetricRegistry) exportPushgateway() {
+       baseUrl, exist := 
p.url.GetNonDefaultParam(constant.PrometheusPushgatewayBaseUrlKey)
+       if !exist {
+               logger.Error("no pushgateway base url found in config path: 
metrics.prometheus.pushgateway.base-url, please check your config")
+               return
+       }
+       username := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
+       password := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
+       job := p.url.GetParam(constant.PrometheusPushgatewayJobKey, 
constant.PrometheusDefaultJobName)
+       pushInterval := 
p.url.GetParamByIntValue(constant.PrometheusPushgatewayPushIntervalKey, 
constant.PrometheusDefaultPushInterval)
+       pusher := push.New(baseUrl, job).Gatherer(p.gather)
+       if len(username) != 0 {
+               pusher.BasicAuth(username, password)
+       }
+       logger.Infof("prometheus pushgateway will push to %s every %d seconds", 
baseUrl, pushInterval)
+       ticker := time.NewTicker(time.Duration(pushInterval) * time.Second)
+       go func() {
+               for range ticker.C {
+                       err := pusher.Add()
+                       if err != nil {
+                               logger.Errorf("push metric data to prometheus 
pushgateway error: %v", err)
+                       } else {
+                               logger.Debugf("prometheus pushgateway push to 
%s success", baseUrl)
+                       }
+               }
+       }()
+}
+
 func (p *promMetricRegistry) Scrape() (string, error) {
        gathering, err := p.gather.Gather()
        if err != nil {
diff --git a/metrics/registry/collector.go b/metrics/registry/collector.go
index 53a5d71b3..871dd469b 100644
--- a/metrics/registry/collector.go
+++ b/metrics/registry/collector.go
@@ -28,9 +28,11 @@ var (
 )
 
 func init() {
-       metrics.AddCollector("registry", func(m metrics.MetricRegistry, _ 
*common.URL) {
-               rc := &registryCollector{metrics.BaseCollector{R: m}}
-               go rc.start()
+       metrics.AddCollector("registry", func(m metrics.MetricRegistry, url 
*common.URL) {
+               if url.GetParamBool(constant.RegistryEnabledKey, true) {
+                       rc := &registryCollector{metrics.BaseCollector{R: m}}
+                       go rc.start()
+               }
        })
 }
 
diff --git a/metrics/rpc/collector.go b/metrics/rpc/collector.go
index dc9fb5334..d9e9f0551 100644
--- a/metrics/rpc/collector.go
+++ b/metrics/rpc/collector.go
@@ -33,12 +33,14 @@ var (
 
 // init will add the rpc collectorFunc to metrics.collectors slice, and lazy 
start the rpc collector goroutine
 func init() {
-       collectorFunc := func(registry metrics.MetricRegistry, c *common.URL) {
-               rc := &rpcCollector{
-                       registry:  registry,
-                       metricSet: buildMetricSet(registry),
+       collectorFunc := func(registry metrics.MetricRegistry, url *common.URL) 
{
+               if url.GetParamBool(constant.RpcEnabledKey, true) {
+                       rc := &rpcCollector{
+                               registry:  registry,
+                               metricSet: buildMetricSet(registry),
+                       }
+                       go rc.start()
                }
-               go rc.start()
        }
 
        metrics.AddCollector("rpc", collectorFunc)

Reply via email to