This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/main by this push:
new 40dd19851 add switch for metric collector (#2424)
40dd19851 is described below
commit 40dd198515bfcf7d50b5fdb08fe138f1e2957754
Author: foghost <[email protected]>
AuthorDate: Tue Oct 24 11:30:02 2023 +0800
add switch for metric collector (#2424)
---
common/constant/key.go | 4 ++
config/metric_config.go | 44 ++++++++++++++----
config/metric_config_test.go | 18 ++++++--
metrics/config_center/collector.go | 8 ++--
metrics/metadata/collector.go | 8 ++--
metrics/prometheus/registry.go | 94 ++++++++++++++++++++------------------
metrics/registry/collector.go | 8 ++--
metrics/rpc/collector.go | 12 +++--
8 files changed, 125 insertions(+), 71 deletions(-)
diff --git a/common/constant/key.go b/common/constant/key.go
index 4d0b0a8a3..c85384d43 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -404,6 +404,10 @@ const (
// metrics key
const (
+ MetadataEnabledKey = "metrics.metadata.enabled"
+ RegistryEnabledKey = "metrics.registry.enabled"
+ ConfigCenterEnabledKey = "metrics.config-center.enabled"
+ RpcEnabledKey = "metrics.rpc.enabled"
AggregationEnabledKey = "aggregation.enabled"
AggregationBucketNumKey = "aggregation.bucket.num"
AggregationTimeWindowSecondsKey = "aggregation.time.window.seconds"
diff --git a/config/metric_config.go b/config/metric_config.go
index 0859b57cc..af41eb853 100644
--- a/config/metric_config.go
+++ b/config/metric_config.go
@@ -35,13 +35,17 @@ import (
// MetricConfig This is the config struct for all metrics implementation
type MetricConfig struct {
- Enable *bool `default:"false" yaml:"enable"
json:"enable,omitempty" property:"enable"`
- Port string `default:"9090" yaml:"port"
json:"port,omitempty" property:"port"`
- Path string `default:"/metrics" yaml:"path"
json:"path,omitempty" property:"path"`
- Protocol string `default:"prometheus" yaml:"protocol"
json:"protocol,omitempty" property:"protocol"`
- Prometheus *PrometheusConfig `yaml:"prometheus" json:"prometheus"
property:"prometheus"`
- Aggregation *AggregateConfig `yaml:"aggregation" json:"aggregation"
property:"aggregation"`
- rootConfig *RootConfig
+ Enable *bool `default:"false" yaml:"enable"
json:"enable,omitempty" property:"enable"`
+ Port string `default:"9090" yaml:"port"
json:"port,omitempty" property:"port"`
+ Path string `default:"/metrics" yaml:"path"
json:"path,omitempty" property:"path"`
+ Protocol string `default:"prometheus"
yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
+ EnableMetadata *bool `default:"true"
yaml:"enable-metadata" json:"enable-metadata,omitempty"
property:"enable-metadata"`
+ EnableRegistry *bool `default:"true"
yaml:"enable-registry" json:"enable-registry,omitempty"
property:"enable-registry"`
+ EnableConfigCenter *bool `default:"true"
yaml:"enable-config-center" json:"enable-config-center,omitempty"
property:"enable-config-center"`
+ EnableRpc *bool `default:"true" yaml:"enable-rpc"
json:"enable-rpc,omitempty" property:"enable-rpc"`
+ Prometheus *PrometheusConfig `yaml:"prometheus"
json:"prometheus" property:"prometheus"`
+ Aggregation *AggregateConfig `yaml:"aggregation"
json:"aggregation" property:"aggregation"`
+ rootConfig *RootConfig
}
type AggregateConfig struct {
@@ -101,6 +105,26 @@ func NewMetricConfigBuilder() *MetricConfigBuilder {
return &MetricConfigBuilder{metricConfig: &MetricConfig{}}
}
+func (mcb *MetricConfigBuilder) SetMetadataEnabled(enabled bool)
*MetricConfigBuilder {
+ mcb.metricConfig.EnableMetadata = &enabled
+ return mcb
+}
+
+func (mcb *MetricConfigBuilder) SetRegistryEnabled(enabled bool)
*MetricConfigBuilder {
+ mcb.metricConfig.EnableRegistry = &enabled
+ return mcb
+}
+
+func (mcb *MetricConfigBuilder) SetConfigCenterEnabled(enabled bool)
*MetricConfigBuilder {
+ mcb.metricConfig.EnableConfigCenter = &enabled
+ return mcb
+}
+
+func (mcb *MetricConfigBuilder) SetRpcEnabled(enabled bool)
*MetricConfigBuilder {
+ mcb.metricConfig.EnableRpc = &enabled
+ return mcb
+}
+
func (mcb *MetricConfigBuilder) Build() *MetricConfig {
return mcb.metricConfig
}
@@ -113,11 +137,15 @@ func (mc *MetricConfig)
DynamicUpdateProperties(newMetricConfig *MetricConfig) {
//
prometheus://localhost:9090?&histogram.enabled=false&prometheus.exporter.enabled=false
func (mc *MetricConfig) toURL() *common.URL {
url, _ := common.NewURL("localhost", common.WithProtocol(mc.Protocol))
- url.SetParam(constant.PrometheusExporterEnabledKey,
strconv.FormatBool(*mc.Enable))
+ url.SetParam(constant.PrometheusExporterEnabledKey,
strconv.FormatBool(*mc.Enable)) // for compatibility
url.SetParam(constant.PrometheusExporterMetricsPortKey, mc.Port)
url.SetParam(constant.PrometheusExporterMetricsPathKey, mc.Path)
url.SetParam(constant.ApplicationKey, mc.rootConfig.Application.Name)
url.SetParam(constant.AppVersionKey, mc.rootConfig.Application.Version)
+ url.SetParam(constant.MetadataEnabledKey,
strconv.FormatBool(*mc.EnableMetadata))
+ url.SetParam(constant.RegistryEnabledKey,
strconv.FormatBool(*mc.EnableRegistry))
+ url.SetParam(constant.ConfigCenterEnabledKey,
strconv.FormatBool(*mc.EnableConfigCenter))
+ url.SetParam(constant.RpcEnabledKey, strconv.FormatBool(*mc.EnableRpc))
if mc.Aggregation != nil {
url.SetParam(constant.AggregationEnabledKey,
strconv.FormatBool(*mc.Aggregation.Enabled))
url.SetParam(constant.AggregationBucketNumKey,
strconv.Itoa(mc.Aggregation.BucketNum))
diff --git a/config/metric_config_test.go b/config/metric_config_test.go
index 70dce11b1..31a0ac6b7 100644
--- a/config/metric_config_test.go
+++ b/config/metric_config_test.go
@@ -26,9 +26,17 @@ import (
)
func TestMetricConfigBuilder(t *testing.T) {
- config := NewMetricConfigBuilder().Build()
- err := config.Init(&RootConfig{Application: &ApplicationConfig{Name:
"dubbo", Version: "1.0.0"}})
- assert.NoError(t, err)
- reporterConfig := config.ToReporterConfig()
- assert.Equal(t, string(reporterConfig.Mode), "pull")
+ config := NewMetricConfigBuilder().
+ SetConfigCenterEnabled(false).
+ SetMetadataEnabled(false).
+ SetRegistryEnabled(false).
+ SetRpcEnabled(false).
+ Build()
+ enable := false
+ assert.Equal(t, &MetricConfig{
+ EnableConfigCenter: &enable,
+ EnableMetadata: &enable,
+ EnableRegistry: &enable,
+ EnableRpc: &enable,
+ }, config)
}
diff --git a/metrics/config_center/collector.go
b/metrics/config_center/collector.go
index 9ae551f0e..3b4bb1e49 100644
--- a/metrics/config_center/collector.go
+++ b/metrics/config_center/collector.go
@@ -30,9 +30,11 @@ var ch = make(chan metrics.MetricsEvent, 10)
var info = metrics.NewMetricKey("dubbo_configcenter_total", "Config Changed
Total")
func init() {
- metrics.AddCollector("config_center", func(mr metrics.MetricRegistry, _
*common.URL) {
- c := &configCenterCollector{r: mr}
- c.start()
+ metrics.AddCollector("config_center", func(mr metrics.MetricRegistry,
url *common.URL) {
+ if url.GetParamBool(constant.ConfigCenterEnabledKey, true) {
+ c := &configCenterCollector{r: mr}
+ c.start()
+ }
})
}
diff --git a/metrics/metadata/collector.go b/metrics/metadata/collector.go
index 7125fb1f1..8a08e0ff4 100644
--- a/metrics/metadata/collector.go
+++ b/metrics/metadata/collector.go
@@ -32,9 +32,11 @@ const eventType = constant.MetricsMetadata
var ch = make(chan metrics.MetricsEvent, 10)
func init() {
- metrics.AddCollector("metadata", func(mr metrics.MetricRegistry, _
*common.URL) {
- l := &MetadataMetricCollector{metrics.BaseCollector{R: mr}}
- l.start()
+ metrics.AddCollector("metadata", func(mr metrics.MetricRegistry, url
*common.URL) {
+ if url.GetParamBool(constant.MetadataEnabledKey, true) {
+ l := &MetadataMetricCollector{metrics.BaseCollector{R:
mr}}
+ l.start()
+ }
})
}
diff --git a/metrics/prometheus/registry.go b/metrics/prometheus/registry.go
index f84f2ad55..d108f8f9f 100644
--- a/metrics/prometheus/registry.go
+++ b/metrics/prometheus/registry.go
@@ -143,56 +143,62 @@ func (p *promMetricRegistry) Rt(m *metrics.MetricId, opts
*metrics.RtOpts) metri
func (p *promMetricRegistry) Export() {
if p.url.GetParamBool(constant.PrometheusExporterEnabledKey, false) {
- go func() {
- mux := http.NewServeMux()
- path :=
p.url.GetParam(constant.PrometheusDefaultMetricsPath,
constant.PrometheusDefaultMetricsPath)
- port :=
p.url.GetParam(constant.PrometheusExporterMetricsPortKey,
constant.PrometheusDefaultMetricsPort)
- mux.Handle(path, promhttp.InstrumentMetricHandler(p.r,
promhttp.HandlerFor(p.gather, promhttp.HandlerOpts{})))
- srv := &http.Server{Addr: ":" + port, Handler: mux}
- extension.AddCustomShutdownCallback(func() {
- ctx, cancel :=
context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- if err := srv.Shutdown(ctx); nil != err {
- logger.Fatalf("prometheus server
shutdown failed, err: %v", err)
- } else {
- logger.Info("prometheus server
gracefully shutdown success")
- }
- })
- logger.Infof("prometheus endpoint :%s%s", port, path)
- if err := srv.ListenAndServe(); err != nil && err !=
http.ErrServerClosed { // except Shutdown or Close
- logger.Errorf("new prometheus server with error
= %v", err)
- }
- }()
+ go p.exportHttp()
}
if p.url.GetParamBool(constant.PrometheusPushgatewayEnabledKey, false) {
- baseUrl, exist :=
p.url.GetNonDefaultParam(constant.PrometheusPushgatewayBaseUrlKey)
- if !exist {
- logger.Error("no pushgateway url found in config path:
metrics.prometheus.pushgateway.bash-url, please check your config file")
- return
- }
- username :=
p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
- password :=
p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
- job := p.url.GetParam(constant.PrometheusPushgatewayJobKey,
constant.PrometheusDefaultJobName)
- pushInterval :=
p.url.GetParamByIntValue(constant.PrometheusPushgatewayPushIntervalKey,
constant.PrometheusDefaultPushInterval)
- pusher := push.New(baseUrl, job).Gatherer(p.gather)
- if len(username) != 0 {
- pusher.BasicAuth(username, password)
+ p.exportPushgateway()
+ }
+}
+
+func (p *promMetricRegistry) exportHttp() {
+ mux := http.NewServeMux()
+ path := p.url.GetParam(constant.PrometheusDefaultMetricsPath,
constant.PrometheusDefaultMetricsPath)
+ port := p.url.GetParam(constant.PrometheusExporterMetricsPortKey,
constant.PrometheusDefaultMetricsPort)
+ mux.Handle(path, promhttp.InstrumentMetricHandler(p.r,
promhttp.HandlerFor(p.gather, promhttp.HandlerOpts{})))
+ srv := &http.Server{Addr: ":" + port, Handler: mux}
+ extension.AddCustomShutdownCallback(func() {
+ ctx, cancel := context.WithTimeout(context.Background(),
5*time.Second)
+ defer cancel()
+ if err := srv.Shutdown(ctx); nil != err {
+ logger.Fatalf("prometheus server shutdown failed, err:
%v", err)
+ } else {
+ logger.Info("prometheus server gracefully shutdown
success")
}
- logger.Infof("prometheus pushgateway will push to %s every %d
seconds", baseUrl, pushInterval)
- ticker := time.NewTicker(time.Duration(pushInterval) *
time.Second)
- go func() {
- for range ticker.C {
- err := pusher.Add()
- if err != nil {
- logger.Errorf("push metric data to
prometheus push gateway error", err)
- } else {
- logger.Debugf("prometheus pushgateway
push to %s success", baseUrl)
- }
- }
- }()
+ })
+ logger.Infof("prometheus endpoint :%s%s", port, path)
+ if err := srv.ListenAndServe(); err != nil && err !=
http.ErrServerClosed { // except Shutdown or Close
+ logger.Errorf("new prometheus server with error: %v", err)
}
}
+func (p *promMetricRegistry) exportPushgateway() {
+ baseUrl, exist :=
p.url.GetNonDefaultParam(constant.PrometheusPushgatewayBaseUrlKey)
+ if !exist {
+ logger.Error("no pushgateway base url found in config path:
metrics.prometheus.pushgateway.base-url, please check your config")
+ return
+ }
+ username := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
+ password := p.url.GetParam(constant.PrometheusPushgatewayBaseUrlKey, "")
+ job := p.url.GetParam(constant.PrometheusPushgatewayJobKey,
constant.PrometheusDefaultJobName)
+ pushInterval :=
p.url.GetParamByIntValue(constant.PrometheusPushgatewayPushIntervalKey,
constant.PrometheusDefaultPushInterval)
+ pusher := push.New(baseUrl, job).Gatherer(p.gather)
+ if len(username) != 0 {
+ pusher.BasicAuth(username, password)
+ }
+ logger.Infof("prometheus pushgateway will push to %s every %d seconds",
baseUrl, pushInterval)
+ ticker := time.NewTicker(time.Duration(pushInterval) * time.Second)
+ go func() {
+ for range ticker.C {
+ err := pusher.Add()
+ if err != nil {
+ logger.Errorf("push metric data to prometheus
pushgateway error: %v", err)
+ } else {
+ logger.Debugf("prometheus pushgateway push to
%s success", baseUrl)
+ }
+ }
+ }()
+}
+
func (p *promMetricRegistry) Scrape() (string, error) {
gathering, err := p.gather.Gather()
if err != nil {
diff --git a/metrics/registry/collector.go b/metrics/registry/collector.go
index 53a5d71b3..871dd469b 100644
--- a/metrics/registry/collector.go
+++ b/metrics/registry/collector.go
@@ -28,9 +28,11 @@ var (
)
func init() {
- metrics.AddCollector("registry", func(m metrics.MetricRegistry, _
*common.URL) {
- rc := ®istryCollector{metrics.BaseCollector{R: m}}
- go rc.start()
+ metrics.AddCollector("registry", func(m metrics.MetricRegistry, url
*common.URL) {
+ if url.GetParamBool(constant.RegistryEnabledKey, true) {
+ rc := ®istryCollector{metrics.BaseCollector{R: m}}
+ go rc.start()
+ }
})
}
diff --git a/metrics/rpc/collector.go b/metrics/rpc/collector.go
index dc9fb5334..d9e9f0551 100644
--- a/metrics/rpc/collector.go
+++ b/metrics/rpc/collector.go
@@ -33,12 +33,14 @@ var (
// init will add the rpc collectorFunc to metrics.collectors slice, and lazy
start the rpc collector goroutine
func init() {
- collectorFunc := func(registry metrics.MetricRegistry, c *common.URL) {
- rc := &rpcCollector{
- registry: registry,
- metricSet: buildMetricSet(registry),
+ collectorFunc := func(registry metrics.MetricRegistry, url *common.URL)
{
+ if url.GetParamBool(constant.RpcEnabledKey, true) {
+ rc := &rpcCollector{
+ registry: registry,
+ metricSet: buildMetricSet(registry),
+ }
+ go rc.start()
}
- go rc.start()
}
metrics.AddCollector("rpc", collectorFunc)