This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new a9a11da88 feat(collector): calculate the total statistics for both
reads and writes and record them as history for analysis of hotspot partitions
(#2359)
a9a11da88 is described below
commit a9a11da886bd32d170fc166c08b69210d89d6a52
Author: Dan Wang <[email protected]>
AuthorDate: Thu Feb 5 15:16:44 2026 +0800
feat(collector): calculate the total statistics for both reads and writes
and record them as history for analysis of hotspot partitions (#2359)
https://github.com/apache/incubator-pegasus/issues/2358
Based on the aggregation results, calculate the total statistics for both
reads and
writes, and record them as history for analysis of hotspot partitions.
---
collector/config.yml | 1 +
collector/go.mod | 1 +
collector/go.sum | 2 +
collector/hotspot/partition_detector.go | 163 ++++++++++++++++++++++++++------
4 files changed, 138 insertions(+), 29 deletions(-)
diff --git a/collector/config.yml b/collector/config.yml
index cfd153f1d..1d63057ee 100644
--- a/collector/config.yml
+++ b/collector/config.yml
@@ -52,3 +52,4 @@ hotspot:
partition_detect_interval : 30s
pull_metrics_timeout : 5s
sample_metrics_interval : 10s
+ max_sample_size : 128
diff --git a/collector/go.mod b/collector/go.mod
index 6b6e2fa69..5aabb0b16 100644
--- a/collector/go.mod
+++ b/collector/go.mod
@@ -21,6 +21,7 @@ go 1.18
require (
github.com/apache/incubator-pegasus/go-client
v0.0.0-20260121121155-96868ed93b2a
+ github.com/gammazero/deque v1.0.0
github.com/kataras/iris/v12 v12.2.0
github.com/prometheus/client_golang v1.18.0
github.com/sirupsen/logrus v1.8.1
diff --git a/collector/go.sum b/collector/go.sum
index dfb81a2c4..95ebed26d 100644
--- a/collector/go.sum
+++ b/collector/go.sum
@@ -85,6 +85,8 @@ github.com/fortytw2/leaktest v1.3.0
h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8
github.com/fsnotify/fsnotify v1.4.7/go.mod
h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.5.4
h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
github.com/fsnotify/fsnotify v1.5.4/go.mod
h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
+github.com/gammazero/deque v1.0.0
h1:LTmimT8H7bXkkCy6gZX7zNLtkbz4NdS2z8LZuor3j34=
+github.com/gammazero/deque v1.0.0/go.mod
h1:iflpYvtGfM3U8S8j+sZEKIak3SAKYpA5/SQewgfXDKo=
github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod
h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/ghodss/yaml v1.0.0/go.mod
h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod
h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
diff --git a/collector/hotspot/partition_detector.go
b/collector/hotspot/partition_detector.go
index 5238aa459..718743d9d 100644
--- a/collector/hotspot/partition_detector.go
+++ b/collector/hotspot/partition_detector.go
@@ -21,12 +21,14 @@ import (
"context"
"fmt"
"strconv"
+ "sync"
"time"
"github.com/apache/incubator-pegasus/collector/metrics"
client "github.com/apache/incubator-pegasus/go-client/admin"
"github.com/apache/incubator-pegasus/go-client/idl/admin"
"github.com/apache/incubator-pegasus/go-client/idl/replication"
+ "github.com/gammazero/deque"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"golang.org/x/sync/errgroup"
@@ -43,6 +45,7 @@ type PartitionDetectorConfig struct {
DetectInterval time.Duration
PullMetricsTimeout time.Duration
SampleMetricsInterval time.Duration
+ MaxSampleSize int
}
func LoadPartitionDetectorConfig() *PartitionDetectorConfig {
@@ -52,6 +55,7 @@ func LoadPartitionDetectorConfig() *PartitionDetectorConfig {
DetectInterval:
viper.GetDuration("hotspot.partition_detect_interval"),
PullMetricsTimeout:
viper.GetDuration("hotspot.pull_metrics_timeout"),
SampleMetricsInterval:
viper.GetDuration("hotspot.sample_metrics_interval"),
+ MaxSampleSize: viper.GetInt("hotspot.max_sample_size"),
}
}
@@ -78,12 +82,15 @@ func NewPartitionDetector(cfg *PartitionDetectorConfig)
(PartitionDetector, erro
}
return &partitionDetectorImpl{
- cfg: cfg,
+ cfg: cfg,
+ analyzers: make(map[partitionAnalyzerKey]*partitionAnalyzer),
}, nil
}
type partitionDetectorImpl struct {
- cfg *PartitionDetectorConfig
+ cfg *PartitionDetectorConfig
+ mtx sync.RWMutex
+ analyzers map[partitionAnalyzerKey]*partitionAnalyzer
}
func (d *partitionDetectorImpl) Run(tom *tomb.Tomb) error {
@@ -115,36 +122,42 @@ type appStats struct {
appName string
partitionCount int32
partitionConfigs []*replication.PartitionConfiguration
- partitionStats []map[string]float64 // {metric_name -> metric_value}
for each partition.
+ partitionStats []map[string]float64 // {metricName -> metricValue}
for each partition.
}
func (d *partitionDetectorImpl) aggregate() error {
- adminClient := client.NewClient(client.Config{
- MetaServers: d.cfg.MetaServers,
- Timeout: d.cfg.RpcTimeout,
- })
- defer adminClient.Close()
-
- // appMap is the final structure that includes all the statistical
values.
- appMap, err := pullTablePartitions(adminClient)
+ // appMap includes the structures that hold all the final statistical
values.
+ appMap, nodes, err := d.fetchMetadata()
if err != nil {
return err
}
- err = d.aggregateMetrics(adminClient, appMap)
+ err = d.aggregateMetrics(appMap, nodes)
if err != nil {
return err
}
+ d.addHotspotSamples(appMap)
+
return nil
}
-// Pull metadata of all available tables with all their partitions and form
the final structure
-// that includes all the statistical values.
-func pullTablePartitions(adminClient client.Client) (appStatsMap, error) {
+// Fetch necessary metadata from meta server for the aggregation of metrics,
including:
+// - the metadata of all available tables with all their partitions, and
+// - the node information of all replica servers.
+// Also, the returned appStatsMap includes the structures that hold all the
final
+// statistical values.
+func (d *partitionDetectorImpl) fetchMetadata() (appStatsMap,
[]*admin.NodeInfo, error) {
+ adminClient := client.NewClient(client.Config{
+ MetaServers: d.cfg.MetaServers,
+ Timeout: d.cfg.RpcTimeout,
+ })
+ defer adminClient.Close()
+
+ // Fetch the information of all available tables.
tables, err := adminClient.ListTables()
if err != nil {
- return nil, err
+ return nil, nil, err
}
appMap := make(appStatsMap)
@@ -152,7 +165,7 @@ func pullTablePartitions(adminClient client.Client)
(appStatsMap, error) {
// Query metadata for each partition of each table.
appID, partitionCount, partitionConfigs, err :=
adminClient.QueryConfig(table.AppName)
if err != nil {
- return nil, err
+ return nil, nil, err
}
// Initialize statistical value for each partition.
@@ -170,20 +183,22 @@ func pullTablePartitions(adminClient client.Client)
(appStatsMap, error) {
}
}
- return appMap, nil
-}
-
-type aggregator func(map[string]float64, string, float64)
-
-// Pull metrics from all nodes and aggregate them to produce the statistics.
-func (d *partitionDetectorImpl) aggregateMetrics(adminClient client.Client,
appMap appStatsMap) error {
+ // Fetch the node information of all replica servers.
nodes, err := adminClient.ListNodes()
if err != nil {
- return err
+ return nil, nil, err
}
- // Pull multiple results of metrics to perform cumulative calculation
to produce the
- // statistics such as QPS.
+ return appMap, nodes, nil
+}
+
+type aggregator func(map[string]float64, string, float64)
+
+// Pull metric samples from nodes and aggregate them to produce the final
statistical results
+// into appMap.
+func (d *partitionDetectorImpl) aggregateMetrics(appMap appStatsMap, nodes
[]*admin.NodeInfo) error {
+ // Pull multiple samples of metrics to perform cumulative calculation
to produce the
+ // statistical results such as QPS.
startSnapshots, err := d.pullMetrics(nodes)
if err != nil {
return err
@@ -204,7 +219,7 @@ func (d *partitionDetectorImpl)
aggregateMetrics(adminClient client.Client, appM
d.calculateStats(snapshot, nodes[i],
func(stats map[string]float64, key string, operand
float64) {
- // Just set the ending number of requests.
+ // Just set the number of requests with ending
snapshot.
stats[key] = operand
},
appMap)
@@ -220,7 +235,7 @@ func (d *partitionDetectorImpl)
aggregateMetrics(adminClient client.Client, appM
return
}
- // Calculate QPS based on the ending
number of requests that have been
+ // Calculate QPS based on ending
snapshot that have been
// set previously.
stats[key] = (value - operand) /
duration.Seconds()
}
@@ -348,3 +363,93 @@ func (d *partitionDetectorImpl) calculateStats(
}
}
}
+
+// Since the partition number of a table might be changed, use (appID,
partitionCount)
+// pair as the key for each table.
+type partitionAnalyzerKey struct {
+ appID int32
+ partitionCount int32
+}
+
+const (
+ readHotspotData = iota
+ writeHotspotData
+ operateHotspotDataNumber
+)
+
+// hotspotPartitionStats holds all the statistical values of each partition,
used for analysis
+// of hotspot partitions.
+type hotspotPartitionStats struct {
+ totalQPS [operateHotspotDataNumber]float64
+}
+
+// Receive statistical values of all kinds of reads and writes, and by
aggregating them we
+// can obtain the overall statistics of reads and writes.
+func calculateHotspotStats(appMap appStatsMap)
map[partitionAnalyzerKey][]hotspotPartitionStats {
+ results := make(map[partitionAnalyzerKey][]hotspotPartitionStats)
+ for appID, stats := range appMap {
+ partitionCount := len(stats.partitionStats)
+ value := make([]hotspotPartitionStats, 0, partitionCount)
+
+ for _, partitionStats := range stats.partitionStats {
+ var hotspotStats hotspotPartitionStats
+
+ // Calculate total QPS over all kinds of reads.
+ for _, metricName := range readMetricNames {
+ hotspotStats.totalQPS[readHotspotData] +=
partitionStats[metricName]
+ }
+
+ // Calculate total QPS over all kinds of writes.
+ for _, metricName := range writeMetricNames {
+ hotspotStats.totalQPS[writeHotspotData] +=
partitionStats[metricName]
+ }
+
+ value = append(value, hotspotStats)
+ }
+
+ key := partitionAnalyzerKey{appID: appID, partitionCount:
int32(partitionCount)}
+ results[key] = value
+ }
+
+ return results
+}
+
+// Calculate statistical values over multiples tables with all partitions of
each table as
+// a sample used for future analysis of hotspot partitions.
+func (d *partitionDetectorImpl) addHotspotSamples(appMap appStatsMap) {
+ hotspotMap := calculateHotspotStats(appMap)
+
+ d.mtx.Lock()
+ defer d.mtx.Unlock()
+
+ for key, value := range hotspotMap {
+ analyzer, ok := d.analyzers[key]
+ if !ok {
+ analyzer = newPartitionAnalyzer(d.cfg.MaxSampleSize)
+ d.analyzers[key] = analyzer
+ }
+
+ analyzer.addSample(value)
+ }
+}
+
+func newPartitionAnalyzer(maxSampleSize int) *partitionAnalyzer {
+ return &partitionAnalyzer{maxSampleSize: maxSampleSize}
+}
+
+// partitionAnalyzer holds the samples for all partitions of a table and
analyses hotspot
+// partitions based on them.
+type partitionAnalyzer struct {
+ // TODO(wangdan): bump gammazero/deque to the lastest version after
upgrading Go to 1.23+,
+ // since older Go versions do not support the `Deque.Iter()` iterator
interface.
+ maxSampleSize int
+ samples deque.Deque[[]hotspotPartitionStats] // Each element is a
sample of all partitions of the table
+}
+
+func (a *partitionAnalyzer) addSample(sample []hotspotPartitionStats) {
+ for a.samples.Len() >= a.maxSampleSize {
+ a.samples.PopFront()
+ }
+
+ a.samples.PushBack(sample)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]