This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new a9a11da88 feat(collector): calculate the total statistics for both 
reads and writes and record them as history for analysis of hotspot partitions 
(#2359)
a9a11da88 is described below

commit a9a11da886bd32d170fc166c08b69210d89d6a52
Author: Dan Wang <[email protected]>
AuthorDate: Thu Feb 5 15:16:44 2026 +0800

    feat(collector): calculate the total statistics for both reads and writes 
and record them as history for analysis of hotspot partitions (#2359)
    
    https://github.com/apache/incubator-pegasus/issues/2358
    
    Based on the aggregation results, calculate the total statistics for both 
reads and
    writes, and record them as history for analysis of hotspot partitions.
---
 collector/config.yml                    |   1 +
 collector/go.mod                        |   1 +
 collector/go.sum                        |   2 +
 collector/hotspot/partition_detector.go | 163 ++++++++++++++++++++++++++------
 4 files changed, 138 insertions(+), 29 deletions(-)

diff --git a/collector/config.yml b/collector/config.yml
index cfd153f1d..1d63057ee 100644
--- a/collector/config.yml
+++ b/collector/config.yml
@@ -52,3 +52,4 @@ hotspot:
   partition_detect_interval : 30s
   pull_metrics_timeout : 5s
   sample_metrics_interval : 10s
+  max_sample_size : 128
diff --git a/collector/go.mod b/collector/go.mod
index 6b6e2fa69..5aabb0b16 100644
--- a/collector/go.mod
+++ b/collector/go.mod
@@ -21,6 +21,7 @@ go 1.18
 
 require (
        github.com/apache/incubator-pegasus/go-client 
v0.0.0-20260121121155-96868ed93b2a
+       github.com/gammazero/deque v1.0.0
        github.com/kataras/iris/v12 v12.2.0
        github.com/prometheus/client_golang v1.18.0
        github.com/sirupsen/logrus v1.8.1
diff --git a/collector/go.sum b/collector/go.sum
index dfb81a2c4..95ebed26d 100644
--- a/collector/go.sum
+++ b/collector/go.sum
@@ -85,6 +85,8 @@ github.com/fortytw2/leaktest v1.3.0 
h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8
 github.com/fsnotify/fsnotify v1.4.7/go.mod 
h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
 github.com/fsnotify/fsnotify v1.5.4 
h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
 github.com/fsnotify/fsnotify v1.5.4/go.mod 
h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
+github.com/gammazero/deque v1.0.0 
h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
+github.com/gammazero/deque v1.0.0/go.mod 
h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
 github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod 
h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
 github.com/ghodss/yaml v1.0.0/go.mod 
h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
 github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod 
h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
diff --git a/collector/hotspot/partition_detector.go 
b/collector/hotspot/partition_detector.go
index 5238aa459..718743d9d 100644
--- a/collector/hotspot/partition_detector.go
+++ b/collector/hotspot/partition_detector.go
@@ -21,12 +21,14 @@ import (
        "context"
        "fmt"
        "strconv"
+       "sync"
        "time"
 
        "github.com/apache/incubator-pegasus/collector/metrics"
        client "github.com/apache/incubator-pegasus/go-client/admin"
        "github.com/apache/incubator-pegasus/go-client/idl/admin"
        "github.com/apache/incubator-pegasus/go-client/idl/replication"
+       "github.com/gammazero/deque"
        log "github.com/sirupsen/logrus"
        "github.com/spf13/viper"
        "golang.org/x/sync/errgroup"
@@ -43,6 +45,7 @@ type PartitionDetectorConfig struct {
        DetectInterval        time.Duration
        PullMetricsTimeout    time.Duration
        SampleMetricsInterval time.Duration
+       MaxSampleSize         int
 }
 
 func LoadPartitionDetectorConfig() *PartitionDetectorConfig {
@@ -52,6 +55,7 @@ func LoadPartitionDetectorConfig() *PartitionDetectorConfig {
                DetectInterval:        
viper.GetDuration("hotspot.partition_detect_interval"),
                PullMetricsTimeout:    
viper.GetDuration("hotspot.pull_metrics_timeout"),
                SampleMetricsInterval: 
viper.GetDuration("hotspot.sample_metrics_interval"),
+               MaxSampleSize:         viper.GetInt("hotspot.max_sample_size"),
        }
 }
 
@@ -78,12 +82,15 @@ func NewPartitionDetector(cfg *PartitionDetectorConfig) 
(PartitionDetector, erro
        }
 
        return &partitionDetectorImpl{
-               cfg: cfg,
+               cfg:       cfg,
+               analyzers: make(map[partitionAnalyzerKey]*partitionAnalyzer),
        }, nil
 }
 
 type partitionDetectorImpl struct {
-       cfg *PartitionDetectorConfig
+       cfg       *PartitionDetectorConfig
+       mtx       sync.RWMutex
+       analyzers map[partitionAnalyzerKey]*partitionAnalyzer
 }
 
 func (d *partitionDetectorImpl) Run(tom *tomb.Tomb) error {
@@ -115,36 +122,42 @@ type appStats struct {
        appName          string
        partitionCount   int32
        partitionConfigs []*replication.PartitionConfiguration
-       partitionStats   []map[string]float64 // {metric_name -> metric_value} 
for each partition.
+       partitionStats   []map[string]float64 // {metricName -> metricValue} 
for each partition.
 }
 
 func (d *partitionDetectorImpl) aggregate() error {
-       adminClient := client.NewClient(client.Config{
-               MetaServers: d.cfg.MetaServers,
-               Timeout:     d.cfg.RpcTimeout,
-       })
-       defer adminClient.Close()
-
-       // appMap is the final structure that includes all the statistical 
values.
-       appMap, err := pullTablePartitions(adminClient)
+       // appMap includes the structures that hold all the final statistical 
values.
+       appMap, nodes, err := d.fetchMetadata()
        if err != nil {
                return err
        }
 
-       err = d.aggregateMetrics(adminClient, appMap)
+       err = d.aggregateMetrics(appMap, nodes)
        if err != nil {
                return err
        }
 
+       d.addHotspotSamples(appMap)
+
        return nil
 }
 
-// Pull metadata of all available tables with all their partitions and form 
the final structure
-// that includes all the statistical values.
-func pullTablePartitions(adminClient client.Client) (appStatsMap, error) {
+// Fetch necessary metadata from meta server for the aggregation of metrics, 
including:
+// - the metadata of all available tables with all their partitions, and
+// - the node information of all replica servers.
+// Also, the returned appStatsMap includes the structures that hold all the 
final
+// statistical values.
+func (d *partitionDetectorImpl) fetchMetadata() (appStatsMap, 
[]*admin.NodeInfo, error) {
+       adminClient := client.NewClient(client.Config{
+               MetaServers: d.cfg.MetaServers,
+               Timeout:     d.cfg.RpcTimeout,
+       })
+       defer adminClient.Close()
+
+       // Fetch the information of all available tables.
        tables, err := adminClient.ListTables()
        if err != nil {
-               return nil, err
+               return nil, nil, err
        }
 
        appMap := make(appStatsMap)
@@ -152,7 +165,7 @@ func pullTablePartitions(adminClient client.Client) 
(appStatsMap, error) {
                // Query metadata for each partition of each table.
                appID, partitionCount, partitionConfigs, err := 
adminClient.QueryConfig(table.AppName)
                if err != nil {
-                       return nil, err
+                       return nil, nil, err
                }
 
                // Initialize statistical value for each partition.
@@ -170,20 +183,22 @@ func pullTablePartitions(adminClient client.Client) 
(appStatsMap, error) {
                }
        }
 
-       return appMap, nil
-}
-
-type aggregator func(map[string]float64, string, float64)
-
-// Pull metrics from all nodes and aggregate them to produce the statistics.
-func (d *partitionDetectorImpl) aggregateMetrics(adminClient client.Client, 
appMap appStatsMap) error {
+       // Fetch the node information of all replica servers.
        nodes, err := adminClient.ListNodes()
        if err != nil {
-               return err
+               return nil, nil, err
        }
 
-       // Pull multiple results of metrics to perform cumulative calculation 
to produce the
-       // statistics such as QPS.
+       return appMap, nodes, nil
+}
+
+type aggregator func(map[string]float64, string, float64)
+
+// Pull metric samples from nodes and aggregate them to produce the final 
statistical results
+// into appMap.
+func (d *partitionDetectorImpl) aggregateMetrics(appMap appStatsMap, nodes 
[]*admin.NodeInfo) error {
+       // Pull multiple samples of metrics to perform cumulative calculation 
to produce the
+       // statistical results such as QPS.
        startSnapshots, err := d.pullMetrics(nodes)
        if err != nil {
                return err
@@ -204,7 +219,7 @@ func (d *partitionDetectorImpl) 
aggregateMetrics(adminClient client.Client, appM
 
                d.calculateStats(snapshot, nodes[i],
                        func(stats map[string]float64, key string, operand 
float64) {
-                               // Just set the ending number of requests.
+                               // Just set the number of requests with ending 
snapshot.
                                stats[key] = operand
                        },
                        appMap)
@@ -220,7 +235,7 @@ func (d *partitionDetectorImpl) 
aggregateMetrics(adminClient client.Client, appM
                                                return
                                        }
 
-                                       // Calculate QPS based on the ending 
number of requests that have been
+                                       // Calculate QPS based on ending 
snapshot that have been
                                        // set previously.
                                        stats[key] = (value - operand) / 
duration.Seconds()
                                }
@@ -348,3 +363,93 @@ func (d *partitionDetectorImpl) calculateStats(
                }
        }
 }
+
+// Since the partition number of a table might be changed, use (appID, 
partitionCount)
+// pair as the key for each table.
+type partitionAnalyzerKey struct {
+       appID          int32
+       partitionCount int32
+}
+
+const (
+       readHotspotData = iota
+       writeHotspotData
+       operateHotspotDataNumber
+)
+
+// hotspotPartitionStats holds all the statistical values of each partition, 
used for analysis
+// of hotspot partitions.
+type hotspotPartitionStats struct {
+       totalQPS [operateHotspotDataNumber]float64
+}
+
+// Receive statistical values of all kinds of reads and writes, and by 
aggregating them we
+// can obtain the overall statistics of reads and writes.
+func calculateHotspotStats(appMap appStatsMap) 
map[partitionAnalyzerKey][]hotspotPartitionStats {
+       results := make(map[partitionAnalyzerKey][]hotspotPartitionStats)
+       for appID, stats := range appMap {
+               partitionCount := len(stats.partitionStats)
+               value := make([]hotspotPartitionStats, 0, partitionCount)
+
+               for _, partitionStats := range stats.partitionStats {
+                       var hotspotStats hotspotPartitionStats
+
+                       // Calculate total QPS over all kinds of reads.
+                       for _, metricName := range readMetricNames {
+                               hotspotStats.totalQPS[readHotspotData] += 
partitionStats[metricName]
+                       }
+
+                       // Calculate total QPS over all kinds of writes.
+                       for _, metricName := range writeMetricNames {
+                               hotspotStats.totalQPS[writeHotspotData] += 
partitionStats[metricName]
+                       }
+
+                       value = append(value, hotspotStats)
+               }
+
+               key := partitionAnalyzerKey{appID: appID, partitionCount: 
int32(partitionCount)}
+               results[key] = value
+       }
+
+       return results
+}
+
+// Calculate statistical values over multiples tables with all partitions of 
each table as
+// a sample used for future analysis of hotspot partitions.
+func (d *partitionDetectorImpl) addHotspotSamples(appMap appStatsMap) {
+       hotspotMap := calculateHotspotStats(appMap)
+
+       d.mtx.Lock()
+       defer d.mtx.Unlock()
+
+       for key, value := range hotspotMap {
+               analyzer, ok := d.analyzers[key]
+               if !ok {
+                       analyzer = newPartitionAnalyzer(d.cfg.MaxSampleSize)
+                       d.analyzers[key] = analyzer
+               }
+
+               analyzer.addSample(value)
+       }
+}
+
+func newPartitionAnalyzer(maxSampleSize int) *partitionAnalyzer {
+       return &partitionAnalyzer{maxSampleSize: maxSampleSize}
+}
+
+// partitionAnalyzer holds the samples for all partitions of a table and 
analyses hotspot
+// partitions based on them.
+type partitionAnalyzer struct {
+       // TODO(wangdan): bump gammazero/deque to the lastest version after 
upgrading Go to 1.23+,
+       // since older Go versions do not support the `Deque.Iter()` iterator 
interface.
+       maxSampleSize int
+       samples       deque.Deque[[]hotspotPartitionStats] // Each element is a 
sample of all partitions of the table
+}
+
+func (a *partitionAnalyzer) addSample(sample []hotspotPartitionStats) {
+       for a.samples.Len() >= a.maxSampleSize {
+               a.samples.PopFront()
+       }
+
+       a.samples.PushBack(sample)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to