This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 09dc9e06d fix(collector): fix the write overwrite caused by multiple
collectors sharing a global variable DataSource (#1857)
09dc9e06d is described below
commit 09dc9e06d7361082716c34ebf4099aa1a4681ad4
Author: Wang Guangshuo <[email protected]>
AuthorDate: Thu Jan 18 14:23:17 2024 +0800
fix(collector): fix the write overwrite caused by multiple collectors
sharing a global variable DataSource (#1857)
https://github.com/apache/incubator-pegasus/issues/1820
---
collector/metrics/metric_collector.go | 61 ++++++++++++++++-------------------
1 file changed, 27 insertions(+), 34 deletions(-)
diff --git a/collector/metrics/metric_collector.go
b/collector/metrics/metric_collector.go
index f3c47785e..9e6f57bbb 100644
--- a/collector/metrics/metric_collector.go
+++ b/collector/metrics/metric_collector.go
@@ -34,8 +34,8 @@ import (
)
const (
- MetaServer int = 0
- ReplicaServer int = 1
+ MetaServer string = "meta"
+ ReplicaServer string = "replica"
)
type Metric struct {
@@ -53,10 +53,6 @@ var GaugeMetricsMap map[string]prometheus.GaugeVec
var CounterMetricsMap map[string]prometheus.CounterVec
var SummaryMetricsMap map[string]prometheus.Summary
-// DataSource 0 meta server, 1 replica server.
-var DataSource int
-var RoleByDataSource map[int]string
-
var TableNameByID map[string]string
type MetricCollector interface {
@@ -64,25 +60,23 @@ type MetricCollector interface {
}
func NewMetricCollector(
- dataSource int,
+ role string,
detectInterval time.Duration,
detectTimeout time.Duration) MetricCollector {
- DataSource = dataSource
GaugeMetricsMap = make(map[string]prometheus.GaugeVec, 128)
CounterMetricsMap = make(map[string]prometheus.CounterVec, 128)
SummaryMetricsMap = make(map[string]prometheus.Summary, 128)
- RoleByDataSource = make(map[int]string, 128)
TableNameByID = make(map[string]string, 128)
- RoleByDataSource[0] = "meta"
- RoleByDataSource[1] = "replica"
- initMetrics()
- return &Collector{detectInterval: detectInterval, detectTimeout:
detectTimeout}
+ var collector = Collector{detectInterval: detectInterval,
detectTimeout: detectTimeout, role: role}
+ collector.initMetrics()
+ return &collector
}
type Collector struct {
detectInterval time.Duration
detectTimeout time.Duration
+ role string
}
func (collector *Collector) Start(tom *tomb.Tomb) error {
@@ -93,7 +87,7 @@ func (collector *Collector) Start(tom *tomb.Tomb) error {
return nil
case <-ticker.C:
updateClusterTableInfo()
- processAllServerMetrics()
+ collector.processAllServerMetrics()
}
}
}
@@ -124,10 +118,10 @@ func getReplicaAddrs() ([]string, error) {
}
// Register all metrics.
-func initMetrics() {
+func (collector *Collector) initMetrics() {
var addrs []string
var err error
- if DataSource == MetaServer {
+ if collector.role == MetaServer {
addrs = viper.GetStringSlice("meta_servers")
} else {
addrs, err = getReplicaAddrs()
@@ -188,10 +182,10 @@ func initMetrics() {
}
// Parse metric data and update metrics.
-func processAllServerMetrics() {
+func (collector *Collector) processAllServerMetrics() {
var addrs []string
var err error
- if DataSource == MetaServer {
+ if collector.role == MetaServer {
addrs = viper.GetStringSlice("meta_servers")
} else {
addrs, err = getReplicaAddrs()
@@ -225,7 +219,7 @@ func processAllServerMetrics() {
tableID, &metricsByTableID)
collectServerLevelTableMetric(entity.Get("metrics").Array(), tableID,
&metricsByServerTableID)
- updateServerLevelTableMetrics(addr,
metricsByServerTableID)
+ collector.updateServerLevelTableMetrics(addr,
metricsByServerTableID)
case "server":
mergeIntoClusterLevelServerMetric(entity.Get("metrics").Array(),
metricsOfCluster)
@@ -237,13 +231,13 @@ func processAllServerMetrics() {
}
}
- updateClusterLevelTableMetrics(metricsByTableID)
- updateServerLevelServerMetrics(metricsByAddr)
- updateClusterLevelMetrics(metricsOfCluster)
+ collector.updateClusterLevelTableMetrics(metricsByTableID)
+ collector.updateServerLevelServerMetrics(metricsByAddr)
+ collector.updateClusterLevelMetrics(metricsOfCluster)
}
// Update table metrics. They belong to a specified server.
-func updateServerLevelTableMetrics(addr string, metricsByServerTableID
map[string]Metrics) {
+func (collector *Collector) updateServerLevelTableMetrics(addr string,
metricsByServerTableID map[string]Metrics) {
for tableID, metrics := range metricsByServerTableID {
var tableName string
if name, ok := TableNameByID[tableID]; !ok {
@@ -252,29 +246,29 @@ func updateServerLevelTableMetrics(addr string,
metricsByServerTableID map[strin
tableName = name
}
for _, metric := range metrics {
- updateMetric(metric, addr, "server", tableName)
+ collector.updateMetric(metric, addr, "server",
tableName)
}
}
}
// Update server metrics. They belong to a specified server.
-func updateServerLevelServerMetrics(metricsByAddr map[string]Metrics) {
+func (collector *Collector) updateServerLevelServerMetrics(metricsByAddr
map[string]Metrics) {
for addr, metrics := range metricsByAddr {
for _, metric := range metrics {
- updateMetric(metric, addr, "server", "server")
+ collector.updateMetric(metric, addr, "server", "server")
}
}
}
// Update cluster level metrics. They belong to a cluster.
-func updateClusterLevelMetrics(metricsOfCluster []Metric) {
+func (collector *Collector) updateClusterLevelMetrics(metricsOfCluster
[]Metric) {
for _, metric := range metricsOfCluster {
- updateMetric(metric, "cluster", "server", metric.name)
+ collector.updateMetric(metric, "cluster", "server", metric.name)
}
}
// Update table metrics. They belong to a cluster.
-func updateClusterLevelTableMetrics(metricsByTableID map[string]Metrics) {
+func (collector *Collector) updateClusterLevelTableMetrics(metricsByTableID
map[string]Metrics) {
for tableID, metrics := range metricsByTableID {
var tableName string
if name, ok := TableNameByID[tableID]; !ok {
@@ -283,19 +277,18 @@ func updateClusterLevelTableMetrics(metricsByTableID
map[string]Metrics) {
tableName = name
}
for _, metric := range metrics {
- updateMetric(metric, "cluster", "table", tableName)
+ collector.updateMetric(metric, "cluster", "table",
tableName)
}
}
}
-func updateMetric(metric Metric, endpoint string, level string, title string) {
- role := RoleByDataSource[DataSource]
+func (collector *Collector) updateMetric(metric Metric, endpoint string, level
string, title string) {
switch metric.mtype {
case "Counter":
if counter, ok := CounterMetricsMap[metric.name]; ok {
counter.With(
prometheus.Labels{"endpoint": endpoint,
- "role": role, "level": level,
+ "role": collector.role, "level": level,
"title":
title}).Add(float64(metric.value))
} else {
log.Warnf("Unknown metric name %s", metric.name)
@@ -304,7 +297,7 @@ func updateMetric(metric Metric, endpoint string, level
string, title string) {
if gauge, ok := GaugeMetricsMap[metric.name]; ok {
gauge.With(
prometheus.Labels{"endpoint": endpoint,
- "role": role, "level": level,
+ "role": collector.role, "level": level,
"title":
title}).Set(float64(metric.value))
} else {
log.Warnf("Unknown metric name %s", metric.name)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]