This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 022854b02 feat(collector): analyze statistics to determine whether
partitions are hotspots (#2367)
022854b02 is described below
commit 022854b0259ffefdfe91f8c3af268878b2f979b6
Author: Dan Wang <[email protected]>
AuthorDate: Wed Feb 11 17:50:29 2026 +0800
feat(collector): analyze statistics to determine whether partitions are
hotspots (#2367)
https://github.com/apache/incubator-pegasus/issues/2358
Based on the [Z-score](https://en.wikipedia.org/wiki/Standard_score) method,
hotspot scores are calculated for each partition using historical samples
of total
read/write QPS. A partition is considered a hotspot once both its score and
QPS
exceed the configured thresholds.
---
collector/config.yml | 8 +-
collector/hotspot/partition_detector.go | 207 +++++++++++++++++++++++++++-----
2 files changed, 181 insertions(+), 34 deletions(-)
diff --git a/collector/config.yml b/collector/config.yml
index 1d63057ee..d489ab0b7 100644
--- a/collector/config.yml
+++ b/collector/config.yml
@@ -19,7 +19,7 @@
cluster_name : "onebox"
# the meta server addresses of the cluster.
-meta_servers:
+meta_servers:
- 127.0.0.1:34601
- 127.0.0.1:34602
- 127.0.0.1:34603
@@ -29,12 +29,12 @@ port : 34101
metrics:
# use falcon as monitoring system.
- sink : falcon
+ sink : falcon
report_interval : 10s
prometheus:
# the exposed port for prometheus exposer
- exposer_port : 1111
+ exposer_port : 1111
falcon_agent:
# the host IP of falcon agent
@@ -53,3 +53,5 @@ hotspot:
pull_metrics_timeout : 5s
sample_metrics_interval : 10s
max_sample_size : 128
+ hotspot_partition_min_score: 3
+ hotspot_partition_min_qps: 100
diff --git a/collector/hotspot/partition_detector.go
b/collector/hotspot/partition_detector.go
index 718743d9d..8e0e0a8be 100644
--- a/collector/hotspot/partition_detector.go
+++ b/collector/hotspot/partition_detector.go
@@ -20,6 +20,7 @@ package hotspot
import (
"context"
"fmt"
+ "math"
"strconv"
"sync"
"time"
@@ -40,22 +41,26 @@ type PartitionDetector interface {
}
type PartitionDetectorConfig struct {
- MetaServers []string
- RpcTimeout time.Duration
- DetectInterval time.Duration
- PullMetricsTimeout time.Duration
- SampleMetricsInterval time.Duration
- MaxSampleSize int
+ MetaServers []string
+ RpcTimeout time.Duration
+ DetectInterval time.Duration
+ PullMetricsTimeout time.Duration
+ SampleMetricsInterval time.Duration
+ MaxSampleSize int
+ HotspotPartitionMinScore float64
+ HotspotPartitionMinQPS float64
}
func LoadPartitionDetectorConfig() *PartitionDetectorConfig {
return &PartitionDetectorConfig{
- MetaServers: viper.GetStringSlice("meta_servers"),
- RpcTimeout: viper.GetDuration("hotspot.rpc_timeout"),
- DetectInterval:
viper.GetDuration("hotspot.partition_detect_interval"),
- PullMetricsTimeout:
viper.GetDuration("hotspot.pull_metrics_timeout"),
- SampleMetricsInterval:
viper.GetDuration("hotspot.sample_metrics_interval"),
- MaxSampleSize: viper.GetInt("hotspot.max_sample_size"),
+ MetaServers: viper.GetStringSlice("meta_servers"),
+ RpcTimeout:
viper.GetDuration("hotspot.rpc_timeout"),
+ DetectInterval:
viper.GetDuration("hotspot.partition_detect_interval"),
+ PullMetricsTimeout:
viper.GetDuration("hotspot.pull_metrics_timeout"),
+ SampleMetricsInterval:
viper.GetDuration("hotspot.sample_metrics_interval"),
+ MaxSampleSize:
viper.GetInt("hotspot.max_sample_size"),
+ HotspotPartitionMinScore:
viper.GetFloat64("hotspot.hotspot_partition_min_score"),
+ HotspotPartitionMinQPS:
viper.GetFloat64("hotspot.hotspot_partition_min_qps"),
}
}
@@ -81,6 +86,18 @@ func NewPartitionDetector(cfg *PartitionDetectorConfig)
(PartitionDetector, erro
cfg.DetectInterval, cfg.SampleMetricsInterval)
}
+ if cfg.MaxSampleSize <= 0 {
+ return nil, fmt.Errorf("MaxSampleSize(%d) must be > 0",
cfg.MaxSampleSize)
+ }
+
+ if cfg.HotspotPartitionMinScore <= 0 {
+ return nil, fmt.Errorf("HotspotPartitionMinScore(%f) must be >
0", cfg.HotspotPartitionMinScore)
+ }
+
+ if cfg.HotspotPartitionMinQPS <= 0 {
+ return nil, fmt.Errorf("HotspotPartitionMinQPS (%f) must be >
0", cfg.HotspotPartitionMinQPS)
+ }
+
return &partitionDetectorImpl{
cfg: cfg,
analyzers: make(map[partitionAnalyzerKey]*partitionAnalyzer),
@@ -109,10 +126,14 @@ func (d *partitionDetectorImpl) Run(tom *tomb.Tomb) error
{
}
func (d *partitionDetectorImpl) detect() {
- err := d.aggregate()
+ appMap, err := d.aggregate()
if err != nil {
log.Error("failed to aggregate metrics for hotspot: ", err)
}
+
+ log.Debugf("stats=%v", appMap)
+
+ d.analyse(appMap)
}
// {appID -> appStats}.
@@ -125,21 +146,19 @@ type appStats struct {
partitionStats []map[string]float64 // {metricName -> metricValue}
for each partition.
}
-func (d *partitionDetectorImpl) aggregate() error {
+func (d *partitionDetectorImpl) aggregate() (appStatsMap, error) {
// appMap includes the structures that hold all the final statistical
values.
appMap, nodes, err := d.fetchMetadata()
if err != nil {
- return err
+ return nil, err
}
err = d.aggregateMetrics(appMap, nodes)
if err != nil {
- return err
+ return nil, err
}
- d.addHotspotSamples(appMap)
-
- return nil
+ return appMap, nil
}
// Fetch necessary metadata from meta server for the aggregation of metrics,
including:
@@ -217,7 +236,7 @@ func (d *partitionDetectorImpl) aggregateMetrics(appMap
appStatsMap, nodes []*ad
snapshot.TimestampNS,
startSnapshots[i].TimestampNS)
}
- d.calculateStats(snapshot, nodes[i],
+ calculateStats(snapshot, nodes[i],
func(stats map[string]float64, key string, operand
float64) {
// Just set the number of requests with ending
snapshot.
stats[key] = operand
@@ -226,7 +245,7 @@ func (d *partitionDetectorImpl) aggregateMetrics(appMap
appStatsMap, nodes []*ad
}
for i, snapshot := range startSnapshots {
- d.calculateStats(snapshot, nodes[i],
+ calculateStats(snapshot, nodes[i],
func(duration time.Duration) aggregator {
return func(stats map[string]float64, key
string, operand float64) {
value, ok := stats[key]
@@ -315,7 +334,7 @@ func (d *partitionDetectorImpl) pullMetrics(nodes
[]*admin.NodeInfo) ([]*metrics
return results, nil
}
-func (d *partitionDetectorImpl) calculateStats(
+func calculateStats(
snapshot *metrics.MetricQueryBriefValueSnapshot,
node *admin.NodeInfo,
adder aggregator,
@@ -415,8 +434,9 @@ func calculateHotspotStats(appMap appStatsMap)
map[partitionAnalyzerKey][]hotspo
}
// Calculate statistical values over multiples tables with all partitions of
each table as
-// a sample used for future analysis of hotspot partitions.
-func (d *partitionDetectorImpl) addHotspotSamples(appMap appStatsMap) {
+// a sample, and analyse all samples of each table asynchronously to decide
which partitions
+// of it are hotspots.
+func (d *partitionDetectorImpl) analyse(appMap appStatsMap) {
hotspotMap := calculateHotspotStats(appMap)
d.mtx.Lock()
@@ -425,16 +445,37 @@ func (d *partitionDetectorImpl) addHotspotSamples(appMap
appStatsMap) {
for key, value := range hotspotMap {
analyzer, ok := d.analyzers[key]
if !ok {
- analyzer = newPartitionAnalyzer(d.cfg.MaxSampleSize)
+ analyzer = newPartitionAnalyzer(
+ d.cfg.MaxSampleSize,
+ d.cfg.HotspotPartitionMinScore,
+ d.cfg.HotspotPartitionMinQPS,
+ key.appID,
+ key.partitionCount,
+ )
d.analyzers[key] = analyzer
}
- analyzer.addSample(value)
+ analyzer.add(value)
+
+ // Perform the analysis asynchronously.
+ go analyzer.analyse()
}
}
-func newPartitionAnalyzer(maxSampleSize int) *partitionAnalyzer {
- return &partitionAnalyzer{maxSampleSize: maxSampleSize}
+func newPartitionAnalyzer(
+ maxSampleSize int,
+ hotspotPartitionMinScore float64,
+ hotspotPartitionMinQPS float64,
+ appID int32,
+ partitionCount int32,
+) *partitionAnalyzer {
+ return &partitionAnalyzer{
+ maxSampleSize: maxSampleSize,
+ hotspotPartitionMinScore: hotspotPartitionMinScore,
+ hotspotPartitionMinQPS: hotspotPartitionMinQPS,
+ appID: appID,
+ partitionCount: partitionCount,
+ }
}
// partitionAnalyzer holds the samples for all partitions of a table and
analyses hotspot
@@ -442,14 +483,118 @@ func newPartitionAnalyzer(maxSampleSize int)
*partitionAnalyzer {
type partitionAnalyzer struct {
// TODO(wangdan): bump gammazero/deque to the lastest version after
upgrading Go to 1.23+,
// since older Go versions do not support the `Deque.Iter()` iterator
interface.
- maxSampleSize int
- samples deque.Deque[[]hotspotPartitionStats] // Each element is a
sample of all partitions of the table
+ maxSampleSize int
+ hotspotPartitionMinScore float64
+ hotspotPartitionMinQPS float64
+ appID int32
+ partitionCount int32
+ mtx sync.RWMutex
+ samples deque.Deque[[]hotspotPartitionStats] // Each
element is a sample of all partitions of the table
}
-func (a *partitionAnalyzer) addSample(sample []hotspotPartitionStats) {
+func (a *partitionAnalyzer) add(sample []hotspotPartitionStats) {
+ a.mtx.Lock()
+ defer a.mtx.Unlock()
+
for a.samples.Len() >= a.maxSampleSize {
a.samples.PopFront()
}
a.samples.PushBack(sample)
+ log.Debugf("appID=%d, partitionCount=%d, samples=%v", a.appID,
a.partitionCount, a.samples)
+}
+
+func (a *partitionAnalyzer) analyse() {
+ a.mtx.RLock()
+ defer a.mtx.RUnlock()
+
+ a.analyseHotspots(readHotspotData)
+ a.analyseHotspots(writeHotspotData)
+}
+
+func (a *partitionAnalyzer) analyseHotspots(operationType int) {
+ sample, scores := a.calculateScores(operationType)
+ if len(scores) == 0 {
+ return
+ }
+
+ hotspotCount := a.countHotspots(operationType, sample, scores)
+
+ // TODO(wangdan): export the hotspot-related metrics for collection by
monitoring
+ // systems such as Prometheus.
+ log.Infof("appID=%d, partitionCount=%d, operationType=%d,
hotspotPartitions=%d, scores=%v",
+ a.appID, a.partitionCount, operationType, hotspotCount, scores)
+}
+
+// Calculates [Z-score](https://en.wikipedia.org/wiki/Standard_score) for each
partition by
+// comparing historical data vertically and concurrent data horizontally to
describe the
+// hotspots.
+func (a *partitionAnalyzer) calculateScores(
+ operationType int,
+) (
+ []hotspotPartitionStats,
+ []float64,
+) {
+ var count int
+ var partitionQPSSum float64
+ // TODO(wangdan): use `range a.samples.Iter()` instead for Go 1.23+.
+ for i, n := 0, a.samples.Len(); i < n; i++ {
+ sample := a.samples.At(i)
+ count += len(sample)
+ for _, stats := range sample {
+ partitionQPSSum += stats.totalQPS[operationType]
+ }
+ }
+
+ if count <= 1 {
+ log.Infof("sample size(%d) <= 1, not enough data for
calculation", count)
+ return nil, nil
+ }
+
+ partitionQPSAvg := partitionQPSSum / float64(count)
+
+ var standardDeviation float64
+ // TODO(wangdan): use `range a.samples.Iter()` instead for Go 1.23+.
+ for i, n := 0, a.samples.Len(); i < n; i++ {
+ for _, stats := range a.samples.At(i) {
+ deviation := stats.totalQPS[operationType] -
partitionQPSAvg
+ standardDeviation += deviation * deviation
+ }
+ }
+
+ standardDeviation = math.Sqrt(standardDeviation / float64(count-1))
+
+ sample := a.samples.Back()
+ scores := make([]float64, 0, len(sample))
+ for i := 0; i < len(sample); i++ {
+ if standardDeviation == 0 {
+ scores = append(scores, 0)
+ continue
+ }
+
+ score := (sample[i].totalQPS[operationType] - partitionQPSAvg)
/ standardDeviation
+ scores = append(scores, score)
+ }
+
+ return sample, scores
+}
+
+func (a *partitionAnalyzer) countHotspots(
+ operationType int,
+ sample []hotspotPartitionStats,
+ scores []float64,
+) (hotspotCount int) {
+ for i, score := range scores {
+ if score < a.hotspotPartitionMinScore {
+ continue
+ }
+
+ if sample[i].totalQPS[operationType] < a.hotspotPartitionMinQPS
{
+ continue
+ }
+
+ hotspotCount++
+ }
+
+ return
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]