This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/main by this push:
new 66a152c3d refactor: split metrics module into multiple files (#2310)
66a152c3d is described below
commit 66a152c3d8f8820806872ef4d9bb037df9d7d367
Author: Wang Guan <[email protected]>
AuthorDate: Tue May 30 17:04:44 2023 +0800
refactor: split metrics module into multiple files (#2310)
---
common/constant/key.go | 3 +
common/extension/metrics_test.go | 11 +-
config/metric_config.go | 9 +-
filter/metrics/filter.go | 4 +-
filter/metrics/filter_test.go | 12 +-
metrics/prometheus/after_invocation.go | 78 +++++
metrics/prometheus/api.go | 199 ++++++++++++
.../prometheus/before_invocation.go | 33 +-
metrics/prometheus/common.go | 111 +++++++
.../prometheus/constant.go | 52 ++--
metrics/prometheus/metric_set.go | 63 ++++
metrics/prometheus/reporter.go | 346 +--------------------
metrics/prometheus/reporter_test.go | 13 +-
metrics/reporter.go | 4 +-
14 files changed, 504 insertions(+), 434 deletions(-)
diff --git a/common/constant/key.go b/common/constant/key.go
index 131b200ae..16933a408 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -176,6 +176,9 @@ const (
const (
ApplicationKey = "application"
+ ApplicationNameKey = "application_name"
+ HostnameKey = "hostname"
+ IpKey = "ip"
OrganizationKey = "organization"
NameKey = "name"
ModuleKey = "module"
diff --git a/common/extension/metrics_test.go b/common/extension/metrics_test.go
index c4cdc41a5..47395161d 100644
--- a/common/extension/metrics_test.go
+++ b/common/extension/metrics_test.go
@@ -21,15 +21,10 @@ import (
"context"
"testing"
"time"
-)
-import (
- "github.com/stretchr/testify/assert"
-)
-
-import (
"dubbo.apache.org/dubbo-go/v3/metrics"
"dubbo.apache.org/dubbo-go/v3/protocol"
+ "github.com/stretchr/testify/assert"
)
func TestGetMetricReporter(t *testing.T) {
@@ -44,6 +39,6 @@ func TestGetMetricReporter(t *testing.T) {
type mockReporter struct{}
-// Report method for feature expansion
-func (m mockReporter) Report(ctx context.Context, invoker protocol.Invoker,
invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
+// implement the interface of Reporter
+func (m mockReporter) ReportAfterInvocation(ctx context.Context, invoker
protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res
protocol.Result) {
}
diff --git a/config/metric_config.go b/config/metric_config.go
index 14d7b7271..b47d48a56 100644
--- a/config/metric_config.go
+++ b/config/metric_config.go
@@ -22,19 +22,16 @@ import (
"github.com/dubbogo/gost/log/logger"
- "github.com/pkg/errors"
-)
-
-import (
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metrics"
+ "github.com/pkg/errors"
)
// MetricConfig This is the config struct for all metrics implementation
type MetricConfig struct {
Mode string `default:"pull" yaml:"mode"
json:"mode,omitempty" property:"mode"` // push or pull,
Namespace string `default:"dubbo" yaml:"namespace"
json:"namespace,omitempty" property:"namespace"`
- Enable bool `default:"true" yaml:"enable"
json:"enable,omitempty" property:"enable"`
+ Enable *bool `default:"true" yaml:"enable"
json:"enable,omitempty" property:"enable"`
Port string `default:"9090" yaml:"port"
json:"port,omitempty" property:"port"`
Path string `default:"/metrics" yaml:"path"
json:"path,omitempty" property:"path"`
PushGatewayAddress string `default:"" yaml:"push-gateway-address"
json:"push-gateway-address,omitempty" property:"push-gateway-address"`
@@ -50,7 +47,7 @@ func (mc *MetricConfig) ToReporterConfig()
*metrics.ReporterConfig {
defaultMetricsReportConfig.Namespace = mc.Namespace
}
- defaultMetricsReportConfig.Enable = mc.Enable
+ defaultMetricsReportConfig.Enable = *mc.Enable
defaultMetricsReportConfig.Port = mc.Port
defaultMetricsReportConfig.Path = mc.Path
defaultMetricsReportConfig.PushGatewayAddress = mc.PushGatewayAddress
diff --git a/filter/metrics/filter.go b/filter/metrics/filter.go
index ba33cc67a..9782c0d4e 100644
--- a/filter/metrics/filter.go
+++ b/filter/metrics/filter.go
@@ -21,9 +21,7 @@ package metrics
import (
"context"
"time"
-)
-import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/filter"
@@ -52,7 +50,7 @@ func (p *Filter) Invoke(ctx context.Context, invoker
protocol.Invoker, invocatio
duration := end.Sub(start)
go func() {
for _, reporter := range p.reporters {
- reporter.Report(ctx, invoker, invocation, duration, res)
+ reporter.ReportAfterInvocation(ctx, invoker,
invocation, duration, res)
}
}()
return res
diff --git a/filter/metrics/filter_test.go b/filter/metrics/filter_test.go
index 5b7b6cdaf..c12247d47 100644
--- a/filter/metrics/filter_test.go
+++ b/filter/metrics/filter_test.go
@@ -22,17 +22,13 @@ import (
"sync"
"testing"
"time"
-)
-
-import (
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/mock"
-)
-import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metrics"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/mock"
+
_ "dubbo.apache.org/dubbo-go/v3/metrics/prometheus"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
@@ -76,7 +72,7 @@ type mockReporter struct {
wg sync.WaitGroup
}
-func (m *mockReporter) Report(ctx context.Context, invoker protocol.Invoker,
invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
+func (m *mockReporter) ReportAfterInvocation(ctx context.Context, invoker
protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res
protocol.Result) {
m.Called(ctx, invoker, invocation)
m.wg.Done()
}
diff --git a/metrics/prometheus/after_invocation.go
b/metrics/prometheus/after_invocation.go
new file mode 100644
index 000000000..eed549da7
--- /dev/null
+++ b/metrics/prometheus/after_invocation.go
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package prometheus
+
+import (
+ "context"
+ "time"
+
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
+ "github.com/dubbogo/gost/log/logger"
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+func (reporter *PrometheusReporter) ReportAfterInvocation(ctx context.Context,
invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration,
res protocol.Result) {
+ if !reporter.reporterConfig.Enable {
+ return
+ }
+
+ url := invoker.GetURL()
+
+ var role string // provider or consumer
+ if isProvider(url) {
+ role = providerField
+ } else if isConsumer(url) {
+ role = consumerField
+ } else {
+ logger.Warnf("The url belongs neither the consumer nor the
provider, "+
+ "so the invocation will be ignored. url: %s",
url.String())
+ return
+ }
+ labels := prometheus.Labels{
+ applicationNameKey: url.GetParam(constant.ApplicationKey, ""),
+ groupKey: url.Group(),
+ hostnameKey: "",
+ interfaceKey: url.Service(),
+ ipKey: common.GetLocalIp(),
+ versionKey: url.GetParam(constant.AppVersionKey, ""),
+ methodKey: invocation.MethodName(),
+ }
+
+ reporter.reportRTSummaryVec(role, &labels, cost.Milliseconds())
+ reporter.reportRequestTotalCounterVec(role, &labels)
+}
+
+func (r *PrometheusReporter) reportRTSummaryVec(role string, labels
*prometheus.Labels, costMs int64) {
+ switch role {
+ case providerField:
+ r.providerRTSummaryVec.With(*labels).Observe(float64(costMs))
+ case consumerField:
+ r.consumerRTSummaryVec.With(*labels).Observe(float64(costMs))
+ }
+}
+
+func (r *PrometheusReporter) reportRequestTotalCounterVec(role string, labels
*prometheus.Labels) {
+ switch role {
+ case providerField:
+ r.providerRequestTotalCounterVec.With(*labels).Inc()
+ case consumerField:
+ r.consumerRequestTotalCounterVec.With(*labels).Inc()
+ }
+}
diff --git a/metrics/prometheus/api.go b/metrics/prometheus/api.go
new file mode 100644
index 000000000..eba61e17a
--- /dev/null
+++ b/metrics/prometheus/api.go
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package prometheus
+
+import (
+ "sync"
+
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+type syncMaps struct {
+ userGauge sync.Map
+ userSummary sync.Map
+ userCounter sync.Map
+ userCounterVec sync.Map
+ userGaugeVec sync.Map
+ userSummaryVec sync.Map
+}
+
+// setGauge set gauge to target value with given label, if label is not empty,
set gauge vec
+// if target gauge/gaugevec not exist, just create new gauge and set the value
+func (reporter *PrometheusReporter) setGauge(gaugeName string, toSetValue
float64, labelMap prometheus.Labels) {
+ if len(labelMap) == 0 {
+ // gauge
+ if val, exist := reporter.userGauge.Load(gaugeName); !exist {
+ gauge := newGauge(gaugeName, reporter.namespace)
+ err := prometheus.DefaultRegisterer.Register(gauge)
+ if err == nil {
+ reporter.userGauge.Store(gaugeName, gauge)
+ gauge.Set(toSetValue)
+ } else if are, ok :=
err.(prometheus.AlreadyRegisteredError); ok {
+ // A gauge for that metric has been registered
before.
+ // Use the old gauge from now on.
+
are.ExistingCollector.(prometheus.Gauge).Set(toSetValue)
+ }
+
+ } else {
+ val.(prometheus.Gauge).Set(toSetValue)
+ }
+ return
+ }
+
+ // gauge vec
+ if val, exist := reporter.userGaugeVec.Load(gaugeName); !exist {
+ keyList := make([]string, 0)
+ for k := range labelMap {
+ keyList = append(keyList, k)
+ }
+ gaugeVec := newGaugeVec(gaugeName, reporter.namespace, keyList)
+ err := prometheus.DefaultRegisterer.Register(gaugeVec)
+ if err == nil {
+ reporter.userGaugeVec.Store(gaugeName, gaugeVec)
+ gaugeVec.With(labelMap).Set(toSetValue)
+ } else if are, ok := err.(prometheus.AlreadyRegisteredError);
ok {
+ // A gauge for that metric has been registered before.
+ // Use the old gauge from now on.
+
are.ExistingCollector.(*prometheus.GaugeVec).With(labelMap).Set(toSetValue)
+ }
+ } else {
+ val.(*prometheus.GaugeVec).With(labelMap).Set(toSetValue)
+ }
+}
+
+// incCounter inc counter to inc if label is not empty, set counter vec
+// if target counter/counterVec not exist, just create new counter and inc the
value
+func (reporter *PrometheusReporter) incCounter(counterName string, labelMap
prometheus.Labels) {
+ if len(labelMap) == 0 {
+ // counter
+ if val, exist := reporter.userCounter.Load(counterName); !exist
{
+ counter := newCounter(counterName, reporter.namespace)
+ err := prometheus.DefaultRegisterer.Register(counter)
+ if err == nil {
+ reporter.userCounter.Store(counterName, counter)
+ counter.Inc()
+ } else if are, ok :=
err.(prometheus.AlreadyRegisteredError); ok {
+ // A counter for that metric has been
registered before.
+ // Use the old counter from now on.
+ are.ExistingCollector.(prometheus.Counter).Inc()
+ }
+ } else {
+ val.(prometheus.Counter).Inc()
+ }
+ return
+ }
+
+ // counter vec inc
+ if val, exist := reporter.userCounterVec.Load(counterName); !exist {
+ keyList := make([]string, 0)
+ for k := range labelMap {
+ keyList = append(keyList, k)
+ }
+ counterVec := newCounterVec(counterName, reporter.namespace,
keyList)
+ err := prometheus.DefaultRegisterer.Register(counterVec)
+ if err == nil {
+ reporter.userCounterVec.Store(counterName, counterVec)
+ counterVec.With(labelMap).Inc()
+ } else if are, ok := err.(prometheus.AlreadyRegisteredError);
ok {
+ // A counter for that metric has been registered before.
+ // Use the old counter from now on.
+
are.ExistingCollector.(*prometheus.CounterVec).With(labelMap).Inc()
+ }
+ } else {
+ val.(*prometheus.CounterVec).With(labelMap).Inc()
+ }
+}
+
+// incSummary inc summary to target value with given label, if label is not
empty, set summary vec
+// if target summary/summaryVec not exist, just create new summary and set the
value
+func (reporter *PrometheusReporter) incSummary(summaryName string, toSetValue
float64, labelMap prometheus.Labels) {
+ if len(labelMap) == 0 {
+ // summary
+ if val, exist := reporter.userSummary.Load(summaryName); !exist
{
+ summary := newSummary(summaryName, reporter.namespace)
+ err := prometheus.DefaultRegisterer.Register(summary)
+ if err == nil {
+ reporter.userSummary.Store(summaryName, summary)
+ summary.Observe(toSetValue)
+ } else if are, ok :=
err.(prometheus.AlreadyRegisteredError); ok {
+ // A summary for that metric has been
registered before.
+ // Use the old summary from now on.
+
are.ExistingCollector.(prometheus.Summary).Observe(toSetValue)
+ }
+ } else {
+ val.(prometheus.Summary).Observe(toSetValue)
+ }
+ return
+ }
+
+ // summary vec
+ if val, exist := reporter.userSummaryVec.Load(summaryName); !exist {
+ keyList := make([]string, 0)
+ for k := range labelMap {
+ keyList = append(keyList, k)
+ }
+ summaryVec := newSummaryVec(summaryName, reporter.namespace,
keyList, reporter.reporterConfig.SummaryMaxAge)
+ err := prometheus.DefaultRegisterer.Register(summaryVec)
+ if err == nil {
+ reporter.userSummaryVec.Store(summaryName, summaryVec)
+ summaryVec.With(labelMap).Observe(toSetValue)
+ } else if are, ok := err.(prometheus.AlreadyRegisteredError);
ok {
+ // A summary for that metric has been registered before.
+ // Use the old summary from now on.
+
are.ExistingCollector.(*prometheus.SummaryVec).With(labelMap).Observe(toSetValue)
+ }
+ } else {
+ val.(*prometheus.SummaryVec).With(labelMap).Observe(toSetValue)
+ }
+}
+
+func SetGaugeWithLabel(gaugeName string, val float64, label prometheus.Labels)
{
+ if reporterInstance.reporterConfig.Enable {
+ reporterInstance.setGauge(gaugeName, val, label)
+ }
+}
+
+func SetGauge(gaugeName string, val float64) {
+ if reporterInstance.reporterConfig.Enable {
+ reporterInstance.setGauge(gaugeName, val,
make(prometheus.Labels))
+ }
+}
+
+func IncCounterWithLabel(counterName string, label prometheus.Labels) {
+ if reporterInstance.reporterConfig.Enable {
+ reporterInstance.incCounter(counterName, label)
+ }
+}
+
+func IncCounter(summaryName string) {
+ if reporterInstance.reporterConfig.Enable {
+ reporterInstance.incCounter(summaryName,
make(prometheus.Labels))
+ }
+}
+
+func IncSummaryWithLabel(counterName string, val float64, label
prometheus.Labels) {
+ if reporterInstance.reporterConfig.Enable {
+ reporterInstance.incSummary(counterName, val, label)
+ }
+}
+
+func IncSummary(summaryName string, val float64) {
+ if reporterInstance.reporterConfig.Enable {
+ reporterInstance.incSummary(summaryName, val,
make(prometheus.Labels))
+ }
+}
diff --git a/common/extension/metrics_test.go
b/metrics/prometheus/before_invocation.go
similarity index 53%
copy from common/extension/metrics_test.go
copy to metrics/prometheus/before_invocation.go
index c4cdc41a5..aa288e557 100644
--- a/common/extension/metrics_test.go
+++ b/metrics/prometheus/before_invocation.go
@@ -15,35 +15,4 @@
* limitations under the License.
*/
-package extension
-
-import (
- "context"
- "testing"
- "time"
-)
-
-import (
- "github.com/stretchr/testify/assert"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/metrics"
- "dubbo.apache.org/dubbo-go/v3/protocol"
-)
-
-func TestGetMetricReporter(t *testing.T) {
- reporter := &mockReporter{}
- name := "mock"
- SetMetricReporter(name, func(config *metrics.ReporterConfig)
metrics.Reporter {
- return reporter
- })
- res := GetMetricReporter(name, metrics.NewReporterConfig())
- assert.Equal(t, reporter, res)
-}
-
-type mockReporter struct{}
-
-// Report method for feature expansion
-func (m mockReporter) Report(ctx context.Context, invoker protocol.Invoker,
invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
-}
+package prometheus
diff --git a/metrics/prometheus/common.go b/metrics/prometheus/common.go
new file mode 100644
index 000000000..8936e6729
--- /dev/null
+++ b/metrics/prometheus/common.go
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package prometheus
+
+import (
+ "strconv"
+ "strings"
+ "time"
+
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+// isProvider shows whether this url represents the application received the
request as server
+func isProvider(url *common.URL) bool {
+ role := url.GetParam(constant.RegistryRoleKey, "")
+ return strings.EqualFold(role, strconv.Itoa(common.PROVIDER))
+}
+
+// isConsumer shows whether this url represents the application sent then
request as client
+func isConsumer(url *common.URL) bool {
+ role := url.GetParam(constant.RegistryRoleKey, "")
+ return strings.EqualFold(role, strconv.Itoa(common.CONSUMER))
+}
+
+func newHistogramVec(name, namespace string, labels []string)
*prometheus.HistogramVec {
+ return prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: namespace,
+ Name: name,
+ Buckets: defaultHistogramBucket,
+ },
+ labels)
+}
+
+func newCounter(name, namespace string) prometheus.Counter {
+ return prometheus.NewCounter(
+ prometheus.CounterOpts{
+ Namespace: namespace,
+ Name: name,
+ })
+}
+
+func newCounterVec(name, namespace string, labels []string)
*prometheus.CounterVec {
+ return prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Name: name,
+ Namespace: namespace,
+ }, labels)
+}
+
+func newGauge(name, namespace string) prometheus.Gauge {
+ return prometheus.NewGauge(
+ prometheus.GaugeOpts{
+ Name: name,
+ Namespace: namespace,
+ })
+}
+
+func newGaugeVec(name, namespace string, labels []string) *prometheus.GaugeVec
{
+ return prometheus.NewGaugeVec(
+ prometheus.GaugeOpts{
+ Name: name,
+ Namespace: namespace,
+ }, labels)
+}
+
+func newSummary(name, namespace string) prometheus.Summary {
+ return prometheus.NewSummary(
+ prometheus.SummaryOpts{
+ Name: name,
+ Namespace: namespace,
+ })
+}
+
+// newSummaryVec create SummaryVec, the Namespace is dubbo
+// the objectives is from my experience.
+func newSummaryVec(name, namespace string, labels []string, maxAge int64)
*prometheus.SummaryVec {
+ return prometheus.NewSummaryVec(
+ prometheus.SummaryOpts{
+ Namespace: namespace,
+ Name: name,
+ Objectives: map[float64]float64{
+ 0.5: 0.01,
+ 0.75: 0.01,
+ 0.90: 0.005,
+ 0.98: 0.002,
+ 0.99: 0.001,
+ 0.999: 0.0001,
+ },
+ MaxAge: time.Duration(maxAge),
+ },
+ labels,
+ )
+}
diff --git a/common/extension/metrics_test.go b/metrics/prometheus/constant.go
similarity index 53%
copy from common/extension/metrics_test.go
copy to metrics/prometheus/constant.go
index c4cdc41a5..b4d5e1ca9 100644
--- a/common/extension/metrics_test.go
+++ b/metrics/prometheus/constant.go
@@ -15,35 +15,31 @@
* limitations under the License.
*/
-package extension
+package prometheus
-import (
- "context"
- "testing"
- "time"
-)
+import "dubbo.apache.org/dubbo-go/v3/common/constant"
-import (
- "github.com/stretchr/testify/assert"
-)
+const (
+ reporterName = "prometheus"
+ applicationNameKey = constant.ApplicationNameKey
+ groupKey = constant.GroupKey
+ hostnameKey = constant.HostnameKey
+ interfaceKey = constant.InterfaceKey
+ ipKey = constant.IpKey
+ methodKey = constant.MethodKey
+ versionKey = constant.VersionKey
-import (
- "dubbo.apache.org/dubbo-go/v3/metrics"
- "dubbo.apache.org/dubbo-go/v3/protocol"
-)
+ providerField = "provider"
+ consumerField = "consumer"
+
+ requestsField = "requests"
+ rtField = "rt"
+ tpsField = "tps"
-func TestGetMetricReporter(t *testing.T) {
- reporter := &mockReporter{}
- name := "mock"
- SetMetricReporter(name, func(config *metrics.ReporterConfig)
metrics.Reporter {
- return reporter
- })
- res := GetMetricReporter(name, metrics.NewReporterConfig())
- assert.Equal(t, reporter, res)
-}
-
-type mockReporter struct{}
-
-// Report method for feature expansion
-func (m mockReporter) Report(ctx context.Context, invoker protocol.Invoker,
invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
-}
+ milliSecondsField = "milliseconds"
+
+ counterField = "counter"
+ summaryField = "summary"
+
+ totalField = "total"
+)
diff --git a/metrics/prometheus/metric_set.go b/metrics/prometheus/metric_set.go
new file mode 100644
index 000000000..14e16b77d
--- /dev/null
+++ b/metrics/prometheus/metric_set.go
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package prometheus
+
+import (
+ "strings"
+
+ "dubbo.apache.org/dubbo-go/v3/metrics"
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+// metricSet is a set of metrics that are reported to prometheus in dubbo go
+type metricSet struct {
+ // report the consumer-side's rt gauge data
+ consumerRTSummaryVec *prometheus.SummaryVec
+ // report the provider-side's rt gauge data
+ providerRTSummaryVec *prometheus.SummaryVec
+ // report the provider-side's request total counter data
+ providerRequestTotalCounterVec *prometheus.CounterVec
+ // report the consumer-side's request total counter data
+ consumerRequestTotalCounterVec *prometheus.CounterVec
+ // report the provider-side's processing request counter data
+ // providerRequestProcessingGaugeVec *prometheus.GaugeVec
+ // report the consumer-side's processing request counter data
+ // consumerRequestProcessingGaugeVec *prometheus.GaugeVec
+}
+
+var labelNames = []string{applicationNameKey, groupKey, hostnameKey,
interfaceKey, ipKey, methodKey, versionKey}
+
+// init metric set and register to prometheus
+func (ms *metricSet) initAndRegister(reporterConfig *metrics.ReporterConfig) {
+ ms.consumerRTSummaryVec = newSummaryVec(buildMetricsName(consumerField,
rtField, milliSecondsField, summaryField), reporterConfig.Namespace,
labelNames, reporterConfig.SummaryMaxAge)
+ ms.providerRTSummaryVec = newSummaryVec(buildMetricsName(providerField,
rtField, milliSecondsField, summaryField), reporterConfig.Namespace,
labelNames, reporterConfig.SummaryMaxAge)
+ ms.consumerRequestTotalCounterVec =
newCounterVec(buildMetricsName(consumerField, requestsField, totalField),
reporterConfig.Namespace, labelNames)
+ ms.providerRequestTotalCounterVec =
newCounterVec(buildMetricsName(providerField, requestsField, totalField),
reporterConfig.Namespace, labelNames)
+
+ prometheus.DefaultRegisterer.MustRegister(ms.consumerRTSummaryVec,
ms.providerRTSummaryVec, ms.consumerRequestTotalCounterVec,
ms.providerRequestTotalCounterVec)
+}
+
+func buildMetricsName(args ...string) string {
+ sb := strings.Builder{}
+ for _, arg := range args {
+ sb.WriteString("_")
+ sb.WriteString(arg)
+ }
+ res := strings.TrimPrefix(sb.String(), "_")
+ return res
+}
diff --git a/metrics/prometheus/reporter.go b/metrics/prometheus/reporter.go
index 666670312..dc55ac1ff 100644
--- a/metrics/prometheus/reporter.go
+++ b/metrics/prometheus/reporter.go
@@ -20,49 +20,17 @@ package prometheus
import (
"context"
"net/http"
- "strconv"
- "strings"
"sync"
- "time"
-)
-import (
ocprom "contrib.go.opencensus.io/exporter/prometheus"
-
"github.com/dubbogo/gost/log/logger"
-
"github.com/prometheus/client_golang/prometheus"
- prom "github.com/prometheus/client_golang/prometheus"
-)
-import (
- "dubbo.apache.org/dubbo-go/v3/common"
- "dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metrics"
- "dubbo.apache.org/dubbo-go/v3/protocol"
-)
-
-const (
- reporterName = "prometheus"
- serviceKey = constant.ServiceKey
- groupKey = constant.GroupKey
- versionKey = constant.VersionKey
- methodKey = constant.MethodKey
- timeoutKey = constant.TimeoutKey
-
- // to identify side
- providerPrefix = "provider_"
- consumerPrefix = "consumer_"
-
- // to identify the metric's type
- rtSuffix = "_rt"
- // to identify the metric's type
- tpsSuffix = "_tps"
)
var (
- labelNames = []string{serviceKey, groupKey, versionKey,
methodKey, timeoutKey}
reporterInstance *PrometheusReporter
reporterInitOnce sync.Once
defaultHistogramBucket = []float64{10, 50, 100, 200, 500, 1000, 10000}
@@ -70,7 +38,7 @@ var (
// should initialize after loading configuration
func init() {
- //newPrometheusReporter()
+ // newPrometheusReporter()
extension.SetMetricReporter(reporterName, newPrometheusReporter)
}
@@ -80,153 +48,23 @@ func init() {
type PrometheusReporter struct {
reporterServer *http.Server
reporterConfig *metrics.ReporterConfig
- // report the consumer-side's rt gauge data
- consumerRTSummaryVec *prometheus.SummaryVec
- // report the provider-side's rt gauge data
- providerRTSummaryVec *prometheus.SummaryVec
- // todo tps support
- // report the consumer-side's tps gauge data
- consumerTPSGaugeVec *prometheus.GaugeVec
- // report the provider-side's tps gauge data
- providerTPSGaugeVec *prometheus.GaugeVec
-
- userGauge sync.Map
- userSummary sync.Map
- userCounter sync.Map
- userCounterVec sync.Map
- userGaugeVec sync.Map
- userSummaryVec sync.Map
-
+ metricSet
+ syncMaps
namespace string
}
-// Report reports the duration to Prometheus
-// the role in url must be consumer or provider
-// or it will be ignored
-func (reporter *PrometheusReporter) Report(ctx context.Context, invoker
protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res
protocol.Result) {
- if !reporter.reporterConfig.Enable {
- return
- }
-
- url := invoker.GetURL()
- var rtVec *prometheus.SummaryVec
- if isProvider(url) {
- rtVec = reporter.providerRTSummaryVec
- } else if isConsumer(url) {
- rtVec = reporter.consumerRTSummaryVec
- } else {
- logger.Warnf("The url belongs neither the consumer nor the
provider, "+
- "so the invocation will be ignored. url: %s",
url.String())
- return
- }
-
- labels := prometheus.Labels{
- serviceKey: url.Service(),
- groupKey: url.GetParam(groupKey, ""),
- versionKey: url.GetParam(constant.AppVersionKey, ""),
- methodKey: invocation.MethodName(),
- timeoutKey: url.GetParam(timeoutKey, ""),
- }
- costMs := cost.Nanoseconds()
- rtVec.With(labels).Observe(float64(costMs))
-}
-
-func newHistogramVec(name, namespace string, labels []string)
*prometheus.HistogramVec {
- return prometheus.NewHistogramVec(
- prometheus.HistogramOpts{
- Namespace: namespace,
- Name: name,
- Buckets: defaultHistogramBucket,
- },
- labels)
-}
-
-func newCounter(name, namespace string) prometheus.Counter {
- return prometheus.NewCounter(
- prometheus.CounterOpts{
- Namespace: namespace,
- Name: name,
- })
-}
-
-func newCounterVec(name, namespace string, labels []string)
*prometheus.CounterVec {
- return prometheus.NewCounterVec(
- prometheus.CounterOpts{
- Name: name,
- Namespace: namespace,
- }, labels)
-}
-
-func newGauge(name, namespace string) prometheus.Gauge {
- return prometheus.NewGauge(
- prometheus.GaugeOpts{
- Name: name,
- Namespace: namespace,
- })
-}
-
-func newGaugeVec(name, namespace string, labels []string) *prometheus.GaugeVec
{
- return prometheus.NewGaugeVec(
- prometheus.GaugeOpts{
- Name: name,
- Namespace: namespace,
- }, labels)
-}
-
-func newSummary(name, namespace string) prometheus.Summary {
- return prometheus.NewSummary(
- prometheus.SummaryOpts{
- Name: name,
- Namespace: namespace,
- })
-}
-
-// newSummaryVec create SummaryVec, the Namespace is dubbo
-// the objectives is from my experience.
-func newSummaryVec(name, namespace string, labels []string, maxAge int64)
*prometheus.SummaryVec {
- return prometheus.NewSummaryVec(
- prometheus.SummaryOpts{
- Namespace: namespace,
- Name: name,
- Objectives: map[float64]float64{
- 0.5: 0.01,
- 0.75: 0.01,
- 0.90: 0.005,
- 0.98: 0.002,
- 0.99: 0.001,
- 0.999: 0.0001,
- },
- MaxAge: time.Duration(maxAge),
- },
- labels,
- )
-}
-
-// isProvider shows whether this url represents the application received the
request as server
-func isProvider(url *common.URL) bool {
- role := url.GetParam(constant.RegistryRoleKey, "")
- return strings.EqualFold(role, strconv.Itoa(common.PROVIDER))
-}
-
-// isConsumer shows whether this url represents the application sent then
request as client
-func isConsumer(url *common.URL) bool {
- role := url.GetParam(constant.RegistryRoleKey, "")
- return strings.EqualFold(role, strconv.Itoa(common.CONSUMER))
-}
-
// newPrometheusReporter create new prometheusReporter
// it will register the metrics into prometheus
func newPrometheusReporter(reporterConfig *metrics.ReporterConfig)
metrics.Reporter {
if reporterInstance == nil {
reporterInitOnce.Do(func() {
+ ms := &metricSet{}
+ ms.initAndRegister(reporterConfig)
reporterInstance = &PrometheusReporter{
- reporterConfig: reporterConfig,
- namespace: reporterConfig.Namespace,
- consumerRTSummaryVec:
newSummaryVec(consumerPrefix+serviceKey+rtSuffix, reporterConfig.Namespace,
labelNames, reporterConfig.SummaryMaxAge),
- providerRTSummaryVec:
newSummaryVec(providerPrefix+serviceKey+rtSuffix, reporterConfig.Namespace,
labelNames, reporterConfig.SummaryMaxAge),
+ reporterConfig: reporterConfig,
+ namespace: reporterConfig.Namespace,
+ metricSet: *ms,
}
-
-
prom.DefaultRegisterer.MustRegister(reporterInstance.consumerRTSummaryVec,
reporterInstance.providerRTSummaryVec)
})
}
@@ -242,175 +80,9 @@ func newPrometheusReporter(reporterConfig
*metrics.ReporterConfig) metrics.Repor
return reporterInstance
}
-// setGauge set gauge to target value with given label, if label is not empty,
set gauge vec
-// if target gauge/gaugevec not exist, just create new gauge and set the value
-func (reporter *PrometheusReporter) setGauge(gaugeName string, toSetValue
float64, labelMap prometheus.Labels) {
- if len(labelMap) == 0 {
- // gauge
- if val, exist := reporter.userGauge.Load(gaugeName); !exist {
- gauge := newGauge(gaugeName, reporter.namespace)
- err := prom.DefaultRegisterer.Register(gauge)
- if err == nil {
- reporter.userGauge.Store(gaugeName, gauge)
- gauge.Set(toSetValue)
- } else if are, ok :=
err.(prometheus.AlreadyRegisteredError); ok {
- // A gauge for that metric has been registered
before.
- // Use the old gauge from now on.
-
are.ExistingCollector.(prometheus.Gauge).Set(toSetValue)
- }
-
- } else {
- val.(prometheus.Gauge).Set(toSetValue)
- }
- return
- }
-
- // gauge vec
- if val, exist := reporter.userGaugeVec.Load(gaugeName); !exist {
- keyList := make([]string, 0)
- for k, _ := range labelMap {
- keyList = append(keyList, k)
- }
- gaugeVec := newGaugeVec(gaugeName, reporter.namespace, keyList)
- err := prom.DefaultRegisterer.Register(gaugeVec)
- if err == nil {
- reporter.userGaugeVec.Store(gaugeName, gaugeVec)
- gaugeVec.With(labelMap).Set(toSetValue)
- } else if are, ok := err.(prometheus.AlreadyRegisteredError);
ok {
- // A gauge for that metric has been registered before.
- // Use the old gauge from now on.
-
are.ExistingCollector.(*prometheus.GaugeVec).With(labelMap).Set(toSetValue)
- }
- } else {
- val.(*prometheus.GaugeVec).With(labelMap).Set(toSetValue)
- }
-}
-
-// incCounter inc counter to inc if label is not empty, set counter vec
-// if target counter/counterVec not exist, just create new counter and inc the
value
-func (reporter *PrometheusReporter) incCounter(counterName string, labelMap
prometheus.Labels) {
- if len(labelMap) == 0 {
- // counter
- if val, exist := reporter.userCounter.Load(counterName); !exist
{
- counter := newCounter(counterName, reporter.namespace)
- err := prom.DefaultRegisterer.Register(counter)
- if err == nil {
- reporter.userCounter.Store(counterName, counter)
- counter.Inc()
- } else if are, ok :=
err.(prometheus.AlreadyRegisteredError); ok {
- // A counter for that metric has been
registered before.
- // Use the old counter from now on.
- are.ExistingCollector.(prometheus.Counter).Inc()
- }
- } else {
- val.(prometheus.Counter).Inc()
- }
- return
- }
-
- // counter vec inc
- if val, exist := reporter.userCounterVec.Load(counterName); !exist {
- keyList := make([]string, 0)
- for k, _ := range labelMap {
- keyList = append(keyList, k)
- }
- counterVec := newCounterVec(counterName, reporter.namespace,
keyList)
- err := prom.DefaultRegisterer.Register(counterVec)
- if err == nil {
- reporter.userCounterVec.Store(counterName, counterVec)
- counterVec.With(labelMap).Inc()
- } else if are, ok := err.(prometheus.AlreadyRegisteredError);
ok {
- // A counter for that metric has been registered before.
- // Use the old counter from now on.
-
are.ExistingCollector.(*prometheus.CounterVec).With(labelMap).Inc()
- }
- } else {
- val.(*prometheus.CounterVec).With(labelMap).Inc()
- }
-}
-
-// incSummary inc summary to target value with given label, if label is not
empty, set summary vec
-// if target summary/summaryVec not exist, just create new summary and set the
value
-func (reporter *PrometheusReporter) incSummary(summaryName string, toSetValue
float64, labelMap prometheus.Labels) {
- if len(labelMap) == 0 {
- // summary
- if val, exist := reporter.userSummary.Load(summaryName); !exist
{
- summary := newSummary(summaryName, reporter.namespace)
- err := prom.DefaultRegisterer.Register(summary)
- if err == nil {
- reporter.userSummary.Store(summaryName, summary)
- summary.Observe(toSetValue)
- } else if are, ok :=
err.(prometheus.AlreadyRegisteredError); ok {
- // A summary for that metric has been
registered before.
- // Use the old summary from now on.
-
are.ExistingCollector.(prometheus.Summary).Observe(toSetValue)
- }
- } else {
- val.(prometheus.Summary).Observe(toSetValue)
- }
- return
- }
-
- // summary vec
- if val, exist := reporter.userSummaryVec.Load(summaryName); !exist {
- keyList := make([]string, 0)
- for k, _ := range labelMap {
- keyList = append(keyList, k)
- }
- summaryVec := newSummaryVec(summaryName, reporter.namespace,
keyList, reporter.reporterConfig.SummaryMaxAge)
- err := prom.DefaultRegisterer.Register(summaryVec)
- if err == nil {
- reporter.userSummaryVec.Store(summaryName, summaryVec)
- summaryVec.With(labelMap).Observe(toSetValue)
- } else if are, ok := err.(prometheus.AlreadyRegisteredError);
ok {
- // A summary for that metric has been registered before.
- // Use the old summary from now on.
-
are.ExistingCollector.(*prometheus.SummaryVec).With(labelMap).Observe(toSetValue)
- }
- } else {
- val.(*prometheus.SummaryVec).With(labelMap).Observe(toSetValue)
- }
-}
-
-func SetGaugeWithLabel(gaugeName string, val float64, label prometheus.Labels)
{
- if reporterInstance.reporterConfig.Enable {
- reporterInstance.setGauge(gaugeName, val, label)
- }
-}
-
-func SetGauge(gaugeName string, val float64) {
- if reporterInstance.reporterConfig.Enable {
- reporterInstance.setGauge(gaugeName, val,
make(prometheus.Labels))
- }
-}
-
-func IncCounterWithLabel(counterName string, label prometheus.Labels) {
- if reporterInstance.reporterConfig.Enable {
- reporterInstance.incCounter(counterName, label)
- }
-}
-
-func IncCounter(summaryName string) {
- if reporterInstance.reporterConfig.Enable {
- reporterInstance.incCounter(summaryName,
make(prometheus.Labels))
- }
-}
-
-func IncSummaryWithLabel(counterName string, val float64, label
prometheus.Labels) {
- if reporterInstance.reporterConfig.Enable {
- reporterInstance.incSummary(counterName, val, label)
- }
-}
-
-func IncSummary(summaryName string, val float64) {
- if reporterInstance.reporterConfig.Enable {
- reporterInstance.incSummary(summaryName, val,
make(prometheus.Labels))
- }
-}
-
func (reporter *PrometheusReporter) startupServer(reporterConfig
*metrics.ReporterConfig) {
metricsExporter, err := ocprom.NewExporter(ocprom.Options{
- Registry: prom.DefaultRegisterer.(*prom.Registry),
+ Registry: prometheus.DefaultRegisterer.(*prometheus.Registry),
})
if err != nil {
logger.Errorf("new prometheus reporter with error = %s", err)
diff --git a/metrics/prometheus/reporter_test.go
b/metrics/prometheus/reporter_test.go
index 8ea17d107..d48d92f13 100644
--- a/metrics/prometheus/reporter_test.go
+++ b/metrics/prometheus/reporter_test.go
@@ -21,18 +21,13 @@ import (
"context"
"testing"
"time"
-)
-import (
- "github.com/stretchr/testify/assert"
-)
-
-import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/metrics"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
+ "github.com/stretchr/testify/assert"
)
func TestPrometheusReporter_Report(t *testing.T) {
@@ -50,7 +45,7 @@ func TestPrometheusReporter_Report(t *testing.T) {
assert.False(t, isConsumer(url))
ctx := context.Background()
- reporter.Report(ctx, invoker, inv, 100*time.Millisecond, nil)
+ reporter.ReportAfterInvocation(ctx, invoker, inv, 100*time.Millisecond,
nil)
// consumer side
url, _ = common.NewURL(
@@ -60,7 +55,7 @@ func TestPrometheusReporter_Report(t *testing.T) {
"BDTService&organization=ikurento.com&owner=ZX®istry.role=0&retries=&" +
"service.filter=echo%2Ctoken%2Caccesslog×tamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100")
invoker = protocol.NewBaseInvoker(url)
- reporter.Report(ctx, invoker, inv, 100*time.Millisecond, nil)
+ reporter.ReportAfterInvocation(ctx, invoker, inv, 100*time.Millisecond,
nil)
// invalid role
url, _ = common.NewURL(
@@ -70,5 +65,5 @@ func TestPrometheusReporter_Report(t *testing.T) {
"BDTService&organization=ikurento.com&owner=ZX®istry.role=9&retries=&" +
"service.filter=echo%2Ctoken%2Caccesslog×tamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100")
invoker = protocol.NewBaseInvoker(url)
- reporter.Report(ctx, invoker, inv, 100*time.Millisecond, nil)
+ reporter.ReportAfterInvocation(ctx, invoker, inv, 100*time.Millisecond,
nil)
}
diff --git a/metrics/reporter.go b/metrics/reporter.go
index b0e661743..4a016e39c 100644
--- a/metrics/reporter.go
+++ b/metrics/reporter.go
@@ -20,9 +20,7 @@ package metrics
import (
"context"
"time"
-)
-import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)
@@ -61,6 +59,6 @@ func NewReporterConfig() *ReporterConfig {
//
// Report method reports the duration of an invocation.
type Reporter interface {
- Report(ctx context.Context, invoker protocol.Invoker, invocation
protocol.Invocation,
+ ReportAfterInvocation(ctx context.Context, invoker protocol.Invoker,
invocation protocol.Invocation,
cost time.Duration, res protocol.Result)
}