This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 022854b02 feat(collector): analyze statistics  to determine whether 
partitions are hotspots (#2367)
022854b02 is described below

commit 022854b0259ffefdfe91f8c3af268878b2f979b6
Author: Dan Wang <[email protected]>
AuthorDate: Wed Feb 11 17:50:29 2026 +0800

    feat(collector): analyze statistics  to determine whether partitions are 
hotspots (#2367)
    
    https://github.com/apache/incubator-pegasus/issues/2358
    
    Based on the [Z-score](https://en.wikipedia.org/wiki/Standard_score) method,
    hotspot scores are calculated for each partition using historical samples 
of total
    read/write QPS. A partition is considered a hotspot once both its score and 
QPS
    exceed the configured thresholds.
---
 collector/config.yml                    |   8 +-
 collector/hotspot/partition_detector.go | 207 +++++++++++++++++++++++++++-----
 2 files changed, 181 insertions(+), 34 deletions(-)

diff --git a/collector/config.yml b/collector/config.yml
index 1d63057ee..d489ab0b7 100644
--- a/collector/config.yml
+++ b/collector/config.yml
@@ -19,7 +19,7 @@
 cluster_name : "onebox"
 
 # the meta server addresses of the cluster.
-meta_servers: 
+meta_servers:
   - 127.0.0.1:34601
   - 127.0.0.1:34602
   - 127.0.0.1:34603
@@ -29,12 +29,12 @@ port : 34101
 
 metrics:
   # use falcon as monitoring system.
-  sink : falcon 
+  sink : falcon
   report_interval : 10s
 
 prometheus:
   # the exposed port for prometheus exposer
-  exposer_port : 1111 
+  exposer_port : 1111
 
 falcon_agent:
   # the host IP of falcon agent
@@ -53,3 +53,5 @@ hotspot:
   pull_metrics_timeout : 5s
   sample_metrics_interval : 10s
   max_sample_size : 128
+  hotspot_partition_min_score: 3
+  hotspot_partition_min_qps: 100
diff --git a/collector/hotspot/partition_detector.go 
b/collector/hotspot/partition_detector.go
index 718743d9d..8e0e0a8be 100644
--- a/collector/hotspot/partition_detector.go
+++ b/collector/hotspot/partition_detector.go
@@ -20,6 +20,7 @@ package hotspot
 import (
        "context"
        "fmt"
+       "math"
        "strconv"
        "sync"
        "time"
@@ -40,22 +41,26 @@ type PartitionDetector interface {
 }
 
 type PartitionDetectorConfig struct {
-       MetaServers           []string
-       RpcTimeout            time.Duration
-       DetectInterval        time.Duration
-       PullMetricsTimeout    time.Duration
-       SampleMetricsInterval time.Duration
-       MaxSampleSize         int
+       MetaServers              []string
+       RpcTimeout               time.Duration
+       DetectInterval           time.Duration
+       PullMetricsTimeout       time.Duration
+       SampleMetricsInterval    time.Duration
+       MaxSampleSize            int
+       HotspotPartitionMinScore float64
+       HotspotPartitionMinQPS   float64
 }
 
 func LoadPartitionDetectorConfig() *PartitionDetectorConfig {
        return &PartitionDetectorConfig{
-               MetaServers:           viper.GetStringSlice("meta_servers"),
-               RpcTimeout:            viper.GetDuration("hotspot.rpc_timeout"),
-               DetectInterval:        
viper.GetDuration("hotspot.partition_detect_interval"),
-               PullMetricsTimeout:    
viper.GetDuration("hotspot.pull_metrics_timeout"),
-               SampleMetricsInterval: 
viper.GetDuration("hotspot.sample_metrics_interval"),
-               MaxSampleSize:         viper.GetInt("hotspot.max_sample_size"),
+               MetaServers:              viper.GetStringSlice("meta_servers"),
+               RpcTimeout:               
viper.GetDuration("hotspot.rpc_timeout"),
+               DetectInterval:           
viper.GetDuration("hotspot.partition_detect_interval"),
+               PullMetricsTimeout:       
viper.GetDuration("hotspot.pull_metrics_timeout"),
+               SampleMetricsInterval:    
viper.GetDuration("hotspot.sample_metrics_interval"),
+               MaxSampleSize:            
viper.GetInt("hotspot.max_sample_size"),
+               HotspotPartitionMinScore: 
viper.GetFloat64("hotspot.hotspot_partition_min_score"),
+               HotspotPartitionMinQPS:   
viper.GetFloat64("hotspot.hotspot_partition_min_qps"),
        }
 }
 
@@ -81,6 +86,18 @@ func NewPartitionDetector(cfg *PartitionDetectorConfig) 
(PartitionDetector, erro
                        cfg.DetectInterval, cfg.SampleMetricsInterval)
        }
 
+       if cfg.MaxSampleSize <= 0 {
+               return nil, fmt.Errorf("MaxSampleSize(%d) must be > 0", 
cfg.MaxSampleSize)
+       }
+
+       if cfg.HotspotPartitionMinScore <= 0 {
+               return nil, fmt.Errorf("HotspotPartitionMinScore(%f) must be > 
0", cfg.HotspotPartitionMinScore)
+       }
+
+       if cfg.HotspotPartitionMinQPS <= 0 {
+               return nil, fmt.Errorf("HotspotPartitionMinQPS (%f) must be > 
0", cfg.HotspotPartitionMinQPS)
+       }
+
        return &partitionDetectorImpl{
                cfg:       cfg,
                analyzers: make(map[partitionAnalyzerKey]*partitionAnalyzer),
@@ -109,10 +126,14 @@ func (d *partitionDetectorImpl) Run(tom *tomb.Tomb) error 
{
 }
 
 func (d *partitionDetectorImpl) detect() {
-       err := d.aggregate()
+       appMap, err := d.aggregate()
        if err != nil {
                log.Error("failed to aggregate metrics for hotspot: ", err)
        }
+
+       log.Debugf("stats=%v", appMap)
+
+       d.analyse(appMap)
 }
 
 // {appID -> appStats}.
@@ -125,21 +146,19 @@ type appStats struct {
        partitionStats   []map[string]float64 // {metricName -> metricValue} 
for each partition.
 }
 
-func (d *partitionDetectorImpl) aggregate() error {
+func (d *partitionDetectorImpl) aggregate() (appStatsMap, error) {
        // appMap includes the structures that hold all the final statistical 
values.
        appMap, nodes, err := d.fetchMetadata()
        if err != nil {
-               return err
+               return nil, err
        }
 
        err = d.aggregateMetrics(appMap, nodes)
        if err != nil {
-               return err
+               return nil, err
        }
 
-       d.addHotspotSamples(appMap)
-
-       return nil
+       return appMap, nil
 }
 
 // Fetch necessary metadata from meta server for the aggregation of metrics, 
including:
@@ -217,7 +236,7 @@ func (d *partitionDetectorImpl) aggregateMetrics(appMap 
appStatsMap, nodes []*ad
                                snapshot.TimestampNS, 
startSnapshots[i].TimestampNS)
                }
 
-               d.calculateStats(snapshot, nodes[i],
+               calculateStats(snapshot, nodes[i],
                        func(stats map[string]float64, key string, operand 
float64) {
                                // Just set the number of requests with ending 
snapshot.
                                stats[key] = operand
@@ -226,7 +245,7 @@ func (d *partitionDetectorImpl) aggregateMetrics(appMap 
appStatsMap, nodes []*ad
        }
 
        for i, snapshot := range startSnapshots {
-               d.calculateStats(snapshot, nodes[i],
+               calculateStats(snapshot, nodes[i],
                        func(duration time.Duration) aggregator {
                                return func(stats map[string]float64, key 
string, operand float64) {
                                        value, ok := stats[key]
@@ -315,7 +334,7 @@ func (d *partitionDetectorImpl) pullMetrics(nodes 
[]*admin.NodeInfo) ([]*metrics
        return results, nil
 }
 
-func (d *partitionDetectorImpl) calculateStats(
+func calculateStats(
        snapshot *metrics.MetricQueryBriefValueSnapshot,
        node *admin.NodeInfo,
        adder aggregator,
@@ -415,8 +434,9 @@ func calculateHotspotStats(appMap appStatsMap) 
map[partitionAnalyzerKey][]hotspo
 }
 
 // Calculate statistical values over multiples tables with all partitions of 
each table as
-// a sample used for future analysis of hotspot partitions.
-func (d *partitionDetectorImpl) addHotspotSamples(appMap appStatsMap) {
+// a sample, and analyse all samples of each table asynchronously to decide 
which partitions
+// of it are hotspots.
+func (d *partitionDetectorImpl) analyse(appMap appStatsMap) {
        hotspotMap := calculateHotspotStats(appMap)
 
        d.mtx.Lock()
@@ -425,16 +445,37 @@ func (d *partitionDetectorImpl) addHotspotSamples(appMap 
appStatsMap) {
        for key, value := range hotspotMap {
                analyzer, ok := d.analyzers[key]
                if !ok {
-                       analyzer = newPartitionAnalyzer(d.cfg.MaxSampleSize)
+                       analyzer = newPartitionAnalyzer(
+                               d.cfg.MaxSampleSize,
+                               d.cfg.HotspotPartitionMinScore,
+                               d.cfg.HotspotPartitionMinQPS,
+                               key.appID,
+                               key.partitionCount,
+                       )
                        d.analyzers[key] = analyzer
                }
 
-               analyzer.addSample(value)
+               analyzer.add(value)
+
+               // Perform the analysis asynchronously.
+               go analyzer.analyse()
        }
 }
 
-func newPartitionAnalyzer(maxSampleSize int) *partitionAnalyzer {
-       return &partitionAnalyzer{maxSampleSize: maxSampleSize}
+func newPartitionAnalyzer(
+       maxSampleSize int,
+       hotspotPartitionMinScore float64,
+       hotspotPartitionMinQPS float64,
+       appID int32,
+       partitionCount int32,
+) *partitionAnalyzer {
+       return &partitionAnalyzer{
+               maxSampleSize:            maxSampleSize,
+               hotspotPartitionMinScore: hotspotPartitionMinScore,
+               hotspotPartitionMinQPS:   hotspotPartitionMinQPS,
+               appID:                    appID,
+               partitionCount:           partitionCount,
+       }
 }
 
 // partitionAnalyzer holds the samples for all partitions of a table and 
analyses hotspot
@@ -442,14 +483,118 @@ func newPartitionAnalyzer(maxSampleSize int) 
*partitionAnalyzer {
 type partitionAnalyzer struct {
        // TODO(wangdan): bump gammazero/deque to the lastest version after 
upgrading Go to 1.23+,
        // since older Go versions do not support the `Deque.Iter()` iterator 
interface.
-       maxSampleSize int
-       samples       deque.Deque[[]hotspotPartitionStats] // Each element is a 
sample of all partitions of the table
+       maxSampleSize            int
+       hotspotPartitionMinScore float64
+       hotspotPartitionMinQPS   float64
+       appID                    int32
+       partitionCount           int32
+       mtx                      sync.RWMutex
+       samples                  deque.Deque[[]hotspotPartitionStats] // Each 
element is a sample of all partitions of the table
 }
 
-func (a *partitionAnalyzer) addSample(sample []hotspotPartitionStats) {
+func (a *partitionAnalyzer) add(sample []hotspotPartitionStats) {
+       a.mtx.Lock()
+       defer a.mtx.Unlock()
+
        for a.samples.Len() >= a.maxSampleSize {
                a.samples.PopFront()
        }
 
        a.samples.PushBack(sample)
+       log.Debugf("appID=%d, partitionCount=%d, samples=%v", a.appID, 
a.partitionCount, a.samples)
+}
+
+func (a *partitionAnalyzer) analyse() {
+       a.mtx.RLock()
+       defer a.mtx.RUnlock()
+
+       a.analyseHotspots(readHotspotData)
+       a.analyseHotspots(writeHotspotData)
+}
+
+func (a *partitionAnalyzer) analyseHotspots(operationType int) {
+       sample, scores := a.calculateScores(operationType)
+       if len(scores) == 0 {
+               return
+       }
+
+       hotspotCount := a.countHotspots(operationType, sample, scores)
+
+       // TODO(wangdan): export the hotspot-related metrics for collection by 
monitoring
+       // systems such as Prometheus.
+       log.Infof("appID=%d, partitionCount=%d, operationType=%d, 
hotspotPartitions=%d, scores=%v",
+               a.appID, a.partitionCount, operationType, hotspotCount, scores)
+}
+
+// Calculates [Z-score](https://en.wikipedia.org/wiki/Standard_score) for each 
partition by
+// comparing historical data vertically and concurrent data horizontally to 
describe the
+// hotspots.
+func (a *partitionAnalyzer) calculateScores(
+       operationType int,
+) (
+       []hotspotPartitionStats,
+       []float64,
+) {
+       var count int
+       var partitionQPSSum float64
+       // TODO(wangdan): use `range a.samples.Iter()` instead for Go 1.23+.
+       for i, n := 0, a.samples.Len(); i < n; i++ {
+               sample := a.samples.At(i)
+               count += len(sample)
+               for _, stats := range sample {
+                       partitionQPSSum += stats.totalQPS[operationType]
+               }
+       }
+
+       if count <= 1 {
+               log.Infof("sample size(%d) <= 1, not enough data for 
calculation", count)
+               return nil, nil
+       }
+
+       partitionQPSAvg := partitionQPSSum / float64(count)
+
+       var standardDeviation float64
+       // TODO(wangdan): use `range a.samples.Iter()` instead for Go 1.23+.
+       for i, n := 0, a.samples.Len(); i < n; i++ {
+               for _, stats := range a.samples.At(i) {
+                       deviation := stats.totalQPS[operationType] - 
partitionQPSAvg
+                       standardDeviation += deviation * deviation
+               }
+       }
+
+       standardDeviation = math.Sqrt(standardDeviation / float64(count-1))
+
+       sample := a.samples.Back()
+       scores := make([]float64, 0, len(sample))
+       for i := 0; i < len(sample); i++ {
+               if standardDeviation == 0 {
+                       scores = append(scores, 0)
+                       continue
+               }
+
+               score := (sample[i].totalQPS[operationType] - partitionQPSAvg) 
/ standardDeviation
+               scores = append(scores, score)
+       }
+
+       return sample, scores
+}
+
+func (a *partitionAnalyzer) countHotspots(
+       operationType int,
+       sample []hotspotPartitionStats,
+       scores []float64,
+) (hotspotCount int) {
+       for i, score := range scores {
+               if score < a.hotspotPartitionMinScore {
+                       continue
+               }
+
+               if sample[i].totalQPS[operationType] < a.hotspotPartitionMinQPS 
{
+                       continue
+               }
+
+               hotspotCount++
+       }
+
+       return
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to