This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 09dc9e06d fix(collector): fix the write overwrite caused by multiple 
collectors sharing a global variable DataSource (#1857)
09dc9e06d is described below

commit 09dc9e06d7361082716c34ebf4099aa1a4681ad4
Author: Wang Guangshuo <[email protected]>
AuthorDate: Thu Jan 18 14:23:17 2024 +0800

    fix(collector): fix the write overwrite caused by multiple collectors 
sharing a global variable DataSource (#1857)
    
    https://github.com/apache/incubator-pegasus/issues/1820
---
 collector/metrics/metric_collector.go | 61 ++++++++++++++++-------------------
 1 file changed, 27 insertions(+), 34 deletions(-)

diff --git a/collector/metrics/metric_collector.go 
b/collector/metrics/metric_collector.go
index f3c47785e..9e6f57bbb 100644
--- a/collector/metrics/metric_collector.go
+++ b/collector/metrics/metric_collector.go
@@ -34,8 +34,8 @@ import (
 )
 
 const (
-       MetaServer    int = 0
-       ReplicaServer int = 1
+       MetaServer    string = "meta"
+       ReplicaServer string = "replica"
 )
 
 type Metric struct {
@@ -53,10 +53,6 @@ var GaugeMetricsMap map[string]prometheus.GaugeVec
 var CounterMetricsMap map[string]prometheus.CounterVec
 var SummaryMetricsMap map[string]prometheus.Summary
 
-// DataSource 0 meta server, 1 replica server.
-var DataSource int
-var RoleByDataSource map[int]string
-
 var TableNameByID map[string]string
 
 type MetricCollector interface {
@@ -64,25 +60,23 @@ type MetricCollector interface {
 }
 
 func NewMetricCollector(
-       dataSource int,
+       role string,
        detectInterval time.Duration,
        detectTimeout time.Duration) MetricCollector {
-       DataSource = dataSource
        GaugeMetricsMap = make(map[string]prometheus.GaugeVec, 128)
        CounterMetricsMap = make(map[string]prometheus.CounterVec, 128)
        SummaryMetricsMap = make(map[string]prometheus.Summary, 128)
-       RoleByDataSource = make(map[int]string, 128)
        TableNameByID = make(map[string]string, 128)
-       RoleByDataSource[0] = "meta"
-       RoleByDataSource[1] = "replica"
-       initMetrics()
 
-       return &Collector{detectInterval: detectInterval, detectTimeout: 
detectTimeout}
+       var collector = Collector{detectInterval: detectInterval, 
detectTimeout: detectTimeout, role: role}
+       collector.initMetrics()
+       return &collector
 }
 
 type Collector struct {
        detectInterval time.Duration
        detectTimeout  time.Duration
+       role           string
 }
 
 func (collector *Collector) Start(tom *tomb.Tomb) error {
@@ -93,7 +87,7 @@ func (collector *Collector) Start(tom *tomb.Tomb) error {
                        return nil
                case <-ticker.C:
                        updateClusterTableInfo()
-                       processAllServerMetrics()
+                       collector.processAllServerMetrics()
                }
        }
 }
@@ -124,10 +118,10 @@ func getReplicaAddrs() ([]string, error) {
 }
 
 // Register all metrics.
-func initMetrics() {
+func (collector *Collector) initMetrics() {
        var addrs []string
        var err error
-       if DataSource == MetaServer {
+       if collector.role == MetaServer {
                addrs = viper.GetStringSlice("meta_servers")
        } else {
                addrs, err = getReplicaAddrs()
@@ -188,10 +182,10 @@ func initMetrics() {
 }
 
 // Parse metric data and update metrics.
-func processAllServerMetrics() {
+func (collector *Collector) processAllServerMetrics() {
        var addrs []string
        var err error
-       if DataSource == MetaServer {
+       if collector.role == MetaServer {
                addrs = viper.GetStringSlice("meta_servers")
        } else {
                addrs, err = getReplicaAddrs()
@@ -225,7 +219,7 @@ func processAllServerMetrics() {
                                        tableID, &metricsByTableID)
                                
collectServerLevelTableMetric(entity.Get("metrics").Array(), tableID,
                                        &metricsByServerTableID)
-                               updateServerLevelTableMetrics(addr, 
metricsByServerTableID)
+                               collector.updateServerLevelTableMetrics(addr, 
metricsByServerTableID)
                        case "server":
                                
mergeIntoClusterLevelServerMetric(entity.Get("metrics").Array(),
                                        metricsOfCluster)
@@ -237,13 +231,13 @@ func processAllServerMetrics() {
                }
        }
 
-       updateClusterLevelTableMetrics(metricsByTableID)
-       updateServerLevelServerMetrics(metricsByAddr)
-       updateClusterLevelMetrics(metricsOfCluster)
+       collector.updateClusterLevelTableMetrics(metricsByTableID)
+       collector.updateServerLevelServerMetrics(metricsByAddr)
+       collector.updateClusterLevelMetrics(metricsOfCluster)
 }
 
 // Update table metrics. They belong to a specified server.
-func updateServerLevelTableMetrics(addr string, metricsByServerTableID 
map[string]Metrics) {
+func (collector *Collector) updateServerLevelTableMetrics(addr string, 
metricsByServerTableID map[string]Metrics) {
        for tableID, metrics := range metricsByServerTableID {
                var tableName string
                if name, ok := TableNameByID[tableID]; !ok {
@@ -252,29 +246,29 @@ func updateServerLevelTableMetrics(addr string, 
metricsByServerTableID map[strin
                        tableName = name
                }
                for _, metric := range metrics {
-                       updateMetric(metric, addr, "server", tableName)
+                       collector.updateMetric(metric, addr, "server", 
tableName)
                }
        }
 }
 
 // Update server metrics. They belong to a specified server.
-func updateServerLevelServerMetrics(metricsByAddr map[string]Metrics) {
+func (collector *Collector) updateServerLevelServerMetrics(metricsByAddr 
map[string]Metrics) {
        for addr, metrics := range metricsByAddr {
                for _, metric := range metrics {
-                       updateMetric(metric, addr, "server", "server")
+                       collector.updateMetric(metric, addr, "server", "server")
                }
        }
 }
 
 // Update cluster level metrics. They belong to a cluster.
-func updateClusterLevelMetrics(metricsOfCluster []Metric) {
+func (collector *Collector) updateClusterLevelMetrics(metricsOfCluster 
[]Metric) {
        for _, metric := range metricsOfCluster {
-               updateMetric(metric, "cluster", "server", metric.name)
+               collector.updateMetric(metric, "cluster", "server", metric.name)
        }
 }
 
 // Update table metrics. They belong to a cluster.
-func updateClusterLevelTableMetrics(metricsByTableID map[string]Metrics) {
+func (collector *Collector) updateClusterLevelTableMetrics(metricsByTableID 
map[string]Metrics) {
        for tableID, metrics := range metricsByTableID {
                var tableName string
                if name, ok := TableNameByID[tableID]; !ok {
@@ -283,19 +277,18 @@ func updateClusterLevelTableMetrics(metricsByTableID 
map[string]Metrics) {
                        tableName = name
                }
                for _, metric := range metrics {
-                       updateMetric(metric, "cluster", "table", tableName)
+                       collector.updateMetric(metric, "cluster", "table", 
tableName)
                }
        }
 }
 
-func updateMetric(metric Metric, endpoint string, level string, title string) {
-       role := RoleByDataSource[DataSource]
+func (collector *Collector) updateMetric(metric Metric, endpoint string, level 
string, title string) {
        switch metric.mtype {
        case "Counter":
                if counter, ok := CounterMetricsMap[metric.name]; ok {
                        counter.With(
                                prometheus.Labels{"endpoint": endpoint,
-                                       "role": role, "level": level,
+                                       "role": collector.role, "level": level,
                                        "title": 
title}).Add(float64(metric.value))
                } else {
                        log.Warnf("Unknown metric name %s", metric.name)
@@ -304,7 +297,7 @@ func updateMetric(metric Metric, endpoint string, level 
string, title string) {
                if gauge, ok := GaugeMetricsMap[metric.name]; ok {
                        gauge.With(
                                prometheus.Labels{"endpoint": endpoint,
-                                       "role": role, "level": level,
+                                       "role": collector.role, "level": level,
                                        "title": 
title}).Set(float64(metric.value))
                } else {
                        log.Warnf("Unknown metric name %s", metric.name)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to