This is an automated email from the ASF dual-hosted git repository.

gehafearless pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 38fa72ecf feat(collector): introduce the hotspot detector into 
go-collector (#1943)
38fa72ecf is described below

commit 38fa72ecf767ac141954d00f34ecb0b8cd15222b
Author: Dan Wang <[email protected]>
AuthorDate: Thu Mar 14 11:46:55 2024 +0800

    feat(collector): introduce the hotspot detector into go-collector (#1943)
---
 collector/avail/detector.go                        |  4 +--
 collector/config.yml                               |  3 ++
 .../hotspot/{algo.go => partition_detector.go}     | 40 ++++++++++++++++++++++
 collector/main.go                                  | 27 ++++++++++++---
 collector/metrics/metric_collector.go              |  4 +--
 5 files changed, 69 insertions(+), 9 deletions(-)

diff --git a/collector/avail/detector.go b/collector/avail/detector.go
index 2b93da9f2..d6ddfc27a 100644
--- a/collector/avail/detector.go
+++ b/collector/avail/detector.go
@@ -34,7 +34,7 @@ import (
 
 // Detector periodically checks the service availability of the Pegasus 
cluster.
 type Detector interface {
-       Start(tom *tomb.Tomb) error
+       Run(tom *tomb.Tomb) error
 }
 
 // NewDetector returns a service-availability detector.
@@ -96,7 +96,7 @@ type pegasusDetector struct {
        partitionCount int
 }
 
-func (d *pegasusDetector) Start(tom *tomb.Tomb) error {
+func (d *pegasusDetector) Run(tom *tomb.Tomb) error {
        var err error
        // Open the detect table.
        d.detectTable, err = d.client.OpenTable(context.Background(), 
d.detectTableName)
diff --git a/collector/config.yml b/collector/config.yml
index 1ff2d10e9..5075cf8d5 100644
--- a/collector/config.yml
+++ b/collector/config.yml
@@ -44,3 +44,6 @@ falcon_agent:
 
 availablity_detect:
   table_name : test
+
+hotspot:
+  partition_detect_interval : 10s
diff --git a/collector/hotspot/algo.go b/collector/hotspot/partition_detector.go
similarity index 54%
rename from collector/hotspot/algo.go
rename to collector/hotspot/partition_detector.go
index 6b24419cf..a1b55ef5a 100644
--- a/collector/hotspot/algo.go
+++ b/collector/hotspot/partition_detector.go
@@ -16,3 +16,43 @@
 // under the License.
 
 package hotspot
+
+import (
+       "time"
+
+       log "github.com/sirupsen/logrus"
+       "gopkg.in/tomb.v2"
+)
+
+type PartitionDetector interface {
+       Run(tom *tomb.Tomb) error
+}
+
+type PartitionDetectorConfig struct {
+       DetectInterval time.Duration
+}
+
+func NewPartitionDetector(conf PartitionDetectorConfig) PartitionDetector {
+       return &partitionDetector{
+               detectInterval: conf.DetectInterval,
+       }
+}
+
+type partitionDetector struct {
+       detectInterval time.Duration
+}
+
+func (d *partitionDetector) Run(tom *tomb.Tomb) error {
+       for {
+               select {
+               case <-time.After(d.detectInterval):
+                       d.detect()
+               case <-tom.Dying():
+                       log.Info("Hotspot partition detector exited.")
+                       return nil
+               }
+       }
+}
+
+func (d *partitionDetector) detect() {
+}
diff --git a/collector/main.go b/collector/main.go
index efc2fc98b..7936036d5 100644
--- a/collector/main.go
+++ b/collector/main.go
@@ -27,6 +27,7 @@ import (
        "syscall"
 
        "github.com/apache/incubator-pegasus/collector/avail"
+       "github.com/apache/incubator-pegasus/collector/hotspot"
        "github.com/apache/incubator-pegasus/collector/metrics"
        "github.com/apache/incubator-pegasus/collector/webui"
        "github.com/prometheus/client_golang/prometheus"
@@ -87,18 +88,34 @@ func main() {
 
        tom := &tomb.Tomb{}
        setupSignalHandler(func() {
-               tom.Kill(errors.New("collector terminates")) // kill other 
goroutines
+               tom.Kill(errors.New("Collector terminates")) // kill other 
goroutines
        })
+
        tom.Go(func() error {
                // Set detect inteverl and detect timeout 10s.
-               return avail.NewDetector(10000000000, 10000000000, 
16).Start(tom)
+               return avail.NewDetector(10000000000, 10000000000, 16).Run(tom)
        })
+
        tom.Go(func() error {
-               return metrics.NewMetaServerMetricCollector().Start(tom)
+               return metrics.NewMetaServerMetricCollector().Run(tom)
        })
+
        tom.Go(func() error {
-               return metrics.NewReplicaServerMetricCollector().Start(tom)
+               return metrics.NewReplicaServerMetricCollector().Run(tom)
        })
 
-       <-tom.Dead() // gracefully wait until all goroutines dead
+       tom.Go(func() error {
+               conf := hotspot.PartitionDetectorConfig{
+                       DetectInterval: 
viper.GetDuration("hotspot.partition_detect_interval"),
+               }
+               return hotspot.NewPartitionDetector(conf).Run(tom)
+       })
+
+       err := tom.Wait()
+       if err != nil {
+               log.Error("Collector exited abnormally:", err)
+               return
+       }
+
+       log.Info("Collector exited normally.")
 }
diff --git a/collector/metrics/metric_collector.go 
b/collector/metrics/metric_collector.go
index 9e6f57bbb..6f7fcee78 100644
--- a/collector/metrics/metric_collector.go
+++ b/collector/metrics/metric_collector.go
@@ -56,7 +56,7 @@ var SummaryMetricsMap map[string]prometheus.Summary
 var TableNameByID map[string]string
 
 type MetricCollector interface {
-       Start(tom *tomb.Tomb) error
+       Run(tom *tomb.Tomb) error
 }
 
 func NewMetricCollector(
@@ -79,7 +79,7 @@ type Collector struct {
        role           string
 }
 
-func (collector *Collector) Start(tom *tomb.Tomb) error {
+func (collector *Collector) Run(tom *tomb.Tomb) error {
        ticker := time.NewTicker(collector.detectInterval)
        for {
                select {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to