This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b4c8cfb3 feat(collector): Go Collector fetches and aggregates 
hotspot-related metrics (#2357)
9b4c8cfb3 is described below

commit 9b4c8cfb3ecac5fd272b277c591116d23dfcb251
Author: Dan Wang <[email protected]>
AuthorDate: Thu Jan 29 21:37:38 2026 +0800

    feat(collector): Go Collector fetches and aggregates hotspot-related 
metrics (#2357)
    
    https://github.com/apache/incubator-pegasus/issues/2358.
    
    Periodically pull read/write related metrics from each replica server node
    to calculate traffic, in order to detect whether hotspots exist on each 
partition
    of each table.
---
 collector/config.yml                    |   5 +-
 collector/go.mod                        |   7 +-
 collector/go.sum                        |  12 +-
 collector/hotspot/partition_detector.go | 310 +++++++++++++++++++++++++++++++-
 collector/main.go                       |  14 +-
 collector/metrics/metric_client.go      |  87 +++++++++
 collector/metrics/metric_field.go       |  63 +++++++
 collector/metrics/metric_filter.go      |  79 ++++++++
 collector/metrics/metric_snapshot.go    |  46 +++++
 9 files changed, 598 insertions(+), 25 deletions(-)

diff --git a/collector/config.yml b/collector/config.yml
index 152033608..cfd153f1d 100644
--- a/collector/config.yml
+++ b/collector/config.yml
@@ -48,4 +48,7 @@ availability_detect:
   max_replica_count : 3
 
 hotspot:
-  partition_detect_interval : 10s
+  rpc_timeout : 5s
+  partition_detect_interval : 30s
+  pull_metrics_timeout : 5s
+  sample_metrics_interval : 10s
diff --git a/collector/go.mod b/collector/go.mod
index b86d0f6ed..6b6e2fa69 100644
--- a/collector/go.mod
+++ b/collector/go.mod
@@ -20,13 +20,14 @@ module github.com/apache/incubator-pegasus/collector
 go 1.18
 
 require (
-       github.com/apache/incubator-pegasus/go-client 
v0.0.0-20251112031012-5eb1665e0630
+       github.com/apache/incubator-pegasus/go-client 
v0.0.0-20260121121155-96868ed93b2a
        github.com/kataras/iris/v12 v12.2.0
        github.com/prometheus/client_golang v1.18.0
        github.com/sirupsen/logrus v1.8.1
        github.com/spf13/viper v1.7.1
        github.com/stretchr/testify v1.8.2
        github.com/tidwall/gjson v1.14.0
+       golang.org/x/sync v0.12.0
        gopkg.in/natefinch/lumberjack.v2 v2.0.0
        gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
        k8s.io/apimachinery v0.16.13
@@ -98,3 +99,7 @@ require (
        gopkg.in/yaml.v2 v2.4.0 // indirect
        gopkg.in/yaml.v3 v3.0.1 // indirect
 )
+
+// TODO(wangdan): Go 1.18 does not support golang.org/x/sync v0.12.0+ which 
depend on
+// context.WithCancelCause requiring Go 1.23.
+replace golang.org/x/sync => golang.org/x/sync v0.11.0
diff --git a/collector/go.sum b/collector/go.sum
index fe5ee0e3e..dfb81a2c4 100644
--- a/collector/go.sum
+++ b/collector/go.sum
@@ -35,8 +35,8 @@ github.com/alecthomas/template 
v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy
 github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod 
h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
 github.com/andybalholm/brotli v1.0.5 
h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
 github.com/andybalholm/brotli v1.0.5/go.mod 
h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
-github.com/apache/incubator-pegasus/go-client 
v0.0.0-20251112031012-5eb1665e0630 
h1:W+pNxPZKNEBxx+G2V7KXuWpEXSyMPIHWJgLXiEQB4Uk=
-github.com/apache/incubator-pegasus/go-client 
v0.0.0-20251112031012-5eb1665e0630/go.mod 
h1:SQnz/3Qg6uH1tfl3MKmiYwNk+i5CZiMD9AtMOTZkpgw=
+github.com/apache/incubator-pegasus/go-client 
v0.0.0-20260121121155-96868ed93b2a 
h1:Vqws5uoQ/ibw4QcnDHdXIleiGunC1QmZaMCrJN0znEk=
+github.com/apache/incubator-pegasus/go-client 
v0.0.0-20260121121155-96868ed93b2a/go.mod 
h1:SQnz/3Qg6uH1tfl3MKmiYwNk+i5CZiMD9AtMOTZkpgw=
 github.com/apache/thrift v0.13.0 
h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI=
 github.com/apache/thrift v0.13.0/go.mod 
h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
 github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod 
h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
@@ -396,12 +396,8 @@ golang.org/x/net v0.38.0/go.mod 
h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod 
h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod 
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod 
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
-golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
+golang.org/x/sync v0.11.0/go.mod 
h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
 golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
diff --git a/collector/hotspot/partition_detector.go 
b/collector/hotspot/partition_detector.go
index a1b55ef5a..5238aa459 100644
--- a/collector/hotspot/partition_detector.go
+++ b/collector/hotspot/partition_detector.go
@@ -18,9 +18,18 @@
 package hotspot
 
 import (
+       "context"
+       "fmt"
+       "strconv"
        "time"
 
+       "github.com/apache/incubator-pegasus/collector/metrics"
+       client "github.com/apache/incubator-pegasus/go-client/admin"
+       "github.com/apache/incubator-pegasus/go-client/idl/admin"
+       "github.com/apache/incubator-pegasus/go-client/idl/replication"
        log "github.com/sirupsen/logrus"
+       "github.com/spf13/viper"
+       "golang.org/x/sync/errgroup"
        "gopkg.in/tomb.v2"
 )
 
@@ -29,23 +38,61 @@ type PartitionDetector interface {
 }
 
 type PartitionDetectorConfig struct {
-       DetectInterval time.Duration
+       MetaServers           []string
+       RpcTimeout            time.Duration
+       DetectInterval        time.Duration
+       PullMetricsTimeout    time.Duration
+       SampleMetricsInterval time.Duration
 }
 
-func NewPartitionDetector(conf PartitionDetectorConfig) PartitionDetector {
-       return &partitionDetector{
-               detectInterval: conf.DetectInterval,
+func LoadPartitionDetectorConfig() *PartitionDetectorConfig {
+       return &PartitionDetectorConfig{
+               MetaServers:           viper.GetStringSlice("meta_servers"),
+               RpcTimeout:            viper.GetDuration("hotspot.rpc_timeout"),
+               DetectInterval:        
viper.GetDuration("hotspot.partition_detect_interval"),
+               PullMetricsTimeout:    
viper.GetDuration("hotspot.pull_metrics_timeout"),
+               SampleMetricsInterval: 
viper.GetDuration("hotspot.sample_metrics_interval"),
        }
 }
 
-type partitionDetector struct {
-       detectInterval time.Duration
+func NewPartitionDetector(cfg *PartitionDetectorConfig) (PartitionDetector, 
error) {
+       if len(cfg.MetaServers) == 0 {
+               return nil, fmt.Errorf("MetaServers should not be empty")
+       }
+
+       if cfg.DetectInterval <= 0 {
+               return nil, fmt.Errorf("DetectInterval(%d) must be > 0", 
cfg.DetectInterval)
+       }
+
+       if cfg.PullMetricsTimeout <= 0 {
+               return nil, fmt.Errorf("PullMetricsTimeout(%d) must be > 0", 
cfg.PullMetricsTimeout)
+       }
+
+       if cfg.SampleMetricsInterval <= 0 {
+               return nil, fmt.Errorf("SampleMetricsInterval(%d) must be > 0", 
cfg.SampleMetricsInterval)
+       }
+
+       if cfg.DetectInterval <= cfg.SampleMetricsInterval {
+               return nil, fmt.Errorf("DetectInterval(%d) must be > 
SampleMetricsInterval(%d)",
+                       cfg.DetectInterval, cfg.SampleMetricsInterval)
+       }
+
+       return &partitionDetectorImpl{
+               cfg: cfg,
+       }, nil
+}
+
+type partitionDetectorImpl struct {
+       cfg *PartitionDetectorConfig
 }
 
-func (d *partitionDetector) Run(tom *tomb.Tomb) error {
+func (d *partitionDetectorImpl) Run(tom *tomb.Tomb) error {
+       ticker := time.NewTicker(d.cfg.DetectInterval)
+       defer ticker.Stop()
+
        for {
                select {
-               case <-time.After(d.detectInterval):
+               case <-ticker.C:
                        d.detect()
                case <-tom.Dying():
                        log.Info("Hotspot partition detector exited.")
@@ -54,5 +101,250 @@ func (d *partitionDetector) Run(tom *tomb.Tomb) error {
        }
 }
 
-func (d *partitionDetector) detect() {
+func (d *partitionDetectorImpl) detect() {
+       err := d.aggregate()
+       if err != nil {
+               log.Error("failed to aggregate metrics for hotspot: ", err)
+       }
+}
+
+// {appID -> appStats}.
+type appStatsMap map[int32]appStats
+
+type appStats struct {
+       appName          string
+       partitionCount   int32
+       partitionConfigs []*replication.PartitionConfiguration
+       partitionStats   []map[string]float64 // {metric_name -> metric_value} 
for each partition.
+}
+
+func (d *partitionDetectorImpl) aggregate() error {
+       adminClient := client.NewClient(client.Config{
+               MetaServers: d.cfg.MetaServers,
+               Timeout:     d.cfg.RpcTimeout,
+       })
+       defer adminClient.Close()
+
+       // appMap is the final structure that includes all the statistical 
values.
+       appMap, err := pullTablePartitions(adminClient)
+       if err != nil {
+               return err
+       }
+
+       err = d.aggregateMetrics(adminClient, appMap)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// Pull metadata of all available tables with all their partitions and form 
the final structure
+// that includes all the statistical values.
+func pullTablePartitions(adminClient client.Client) (appStatsMap, error) {
+       tables, err := adminClient.ListTables()
+       if err != nil {
+               return nil, err
+       }
+
+       appMap := make(appStatsMap)
+       for _, table := range tables {
+               // Query metadata for each partition of each table.
+               appID, partitionCount, partitionConfigs, err := 
adminClient.QueryConfig(table.AppName)
+               if err != nil {
+                       return nil, err
+               }
+
+               // Initialize statistical value for each partition.
+               partitionStats := make([]map[string]float64, 0, 
len(partitionConfigs))
+               for range partitionConfigs {
+                       m := make(map[string]float64)
+                       partitionStats = append(partitionStats, m)
+               }
+
+               appMap[appID] = appStats{
+                       appName:          table.AppName,
+                       partitionCount:   partitionCount,
+                       partitionConfigs: partitionConfigs,
+                       partitionStats:   partitionStats,
+               }
+       }
+
+       return appMap, nil
+}
+
+type aggregator func(map[string]float64, string, float64)
+
+// Pull metrics from all nodes and aggregate them to produce the statistics.
+func (d *partitionDetectorImpl) aggregateMetrics(adminClient client.Client, 
appMap appStatsMap) error {
+       nodes, err := adminClient.ListNodes()
+       if err != nil {
+               return err
+       }
+
+       // Pull multiple results of metrics to perform cumulative calculation 
to produce the
+       // statistics such as QPS.
+       startSnapshots, err := d.pullMetrics(nodes)
+       if err != nil {
+               return err
+       }
+
+       time.Sleep(d.cfg.SampleMetricsInterval)
+
+       endSnapshots, err := d.pullMetrics(nodes)
+       if err != nil {
+               return err
+       }
+
+       for i, snapshot := range endSnapshots {
+               if snapshot.TimestampNS <= startSnapshots[i].TimestampNS {
+                       return fmt.Errorf("end timestamp (%d) must be greater 
than start timestamp (%d)",
+                               snapshot.TimestampNS, 
startSnapshots[i].TimestampNS)
+               }
+
+               d.calculateStats(snapshot, nodes[i],
+                       func(stats map[string]float64, key string, operand 
float64) {
+                               // Just set the ending number of requests.
+                               stats[key] = operand
+                       },
+                       appMap)
+       }
+
+       for i, snapshot := range startSnapshots {
+               d.calculateStats(snapshot, nodes[i],
+                       func(duration time.Duration) aggregator {
+                               return func(stats map[string]float64, key 
string, operand float64) {
+                                       value, ok := stats[key]
+                                       if !ok || value < operand {
+                                               stats[key] = 0
+                                               return
+                                       }
+
+                                       // Calculate QPS based on the ending 
number of requests that have been
+                                       // set previously.
+                                       stats[key] = (value - operand) / 
duration.Seconds()
+                               }
+                       
}(time.Duration(endSnapshots[i].TimestampNS-snapshot.TimestampNS)),
+                       appMap)
+       }
+
+       return nil
+}
+
+var (
+       readMetricNames = []string{
+               metrics.MetricReplicaGetRequests,
+               metrics.MetricReplicaMultiGetRequests,
+               metrics.MetricReplicaBatchGetRequests,
+               metrics.MetricReplicaScanRequests,
+       }
+
+       writeMetricNames = []string{
+               metrics.MetricReplicaPutRequests,
+               metrics.MetricReplicaMultiGetRequests,
+               metrics.MetricReplicaRemoveRequests,
+               metrics.MetricReplicaMultiRemoveRequests,
+               metrics.MetricReplicaIncrRequests,
+               metrics.MetricReplicaCheckAndSetRequests,
+               metrics.MetricReplicaCheckAndMutateRequests,
+               metrics.MetricReplicaDupRequests,
+       }
+
+       metricFilter = metrics.NewMetricBriefValueFilter(
+               []string{metrics.MetricEntityTypeReplica},
+               []string{},
+               map[string]string{},
+               append(append([]string(nil), readMetricNames...), 
writeMetricNames...),
+       )
+)
+
+func (d *partitionDetectorImpl) pullMetrics(nodes []*admin.NodeInfo) 
([]*metrics.MetricQueryBriefValueSnapshot, error) {
+       results := make([]*metrics.MetricQueryBriefValueSnapshot, len(nodes))
+
+       ctx, cancel := context.WithTimeout(context.Background(), 
d.cfg.PullMetricsTimeout)
+       defer cancel()
+
+       // Pull the metrics simultaneously from all nodes.
+       eg, ctx := errgroup.WithContext(ctx)
+       puller := func(index int, node *admin.NodeInfo) func() error {
+               return func() error {
+                       // Create a client for each target node.
+                       metricClient := 
metrics.NewMetricClient(&metrics.MetricClientConfig{
+                               Host: node.HpNode.GetHost(),
+                               Port: node.HpNode.GetPort(),
+                       })
+
+                       // Pull the metrics from the target node.
+                       snapshot, err := 
metricClient.GetBriefValueSnapshot(ctx, metricFilter)
+                       if err != nil {
+                               return err
+                       }
+
+                       // Place the pulled result into the position in the 
slice that correspond to
+                       // the target node.
+                       results[index] = snapshot
+                       return nil
+               }
+       }
+
+       for i, node := range nodes {
+               // Launch one Go routine for each target node to pull metrics 
from it.
+               eg.Go(puller(i, node))
+       }
+
+       // Wait all requests to be finished.
+       if err := eg.Wait(); err != nil {
+               return nil, err
+       }
+
+       return results, nil
+}
+
+func (d *partitionDetectorImpl) calculateStats(
+       snapshot *metrics.MetricQueryBriefValueSnapshot,
+       node *admin.NodeInfo,
+       adder aggregator,
+       appMap appStatsMap) {
+       for _, entity := range snapshot.Entities {
+               // The metric must belong to the entity of "replica".
+               if entity.Type != metrics.MetricEntityTypeReplica {
+                       continue
+               }
+
+               // The metric must have valid table id.
+               appID, err := 
strconv.Atoi(entity.Attributes[metrics.MetricEntityTableID])
+               if err != nil {
+                       continue
+               }
+
+               // The table must exist in the returned metadata, which means 
it is available.
+               stats, ok := appMap[int32(appID)]
+               if !ok {
+                       continue
+               }
+
+               // The metric must have valid partition id.
+               partitionID, err := 
strconv.Atoi(entity.Attributes[metrics.MetricEntityPartitionID])
+               if err != nil {
+                       continue
+               }
+
+               // The partition id should be less than the number of 
partitions.
+               if partitionID >= len(stats.partitionConfigs) {
+                       continue
+               }
+
+               // Only primary replica of a partition will be counted.
+               // TODO(wangdan): support Equal() for base.HostPort.
+               primary := stats.partitionConfigs[partitionID].HpPrimary
+               if primary.GetHost() != node.HpNode.GetHost() ||
+                       primary.GetPort() != node.HpNode.GetPort() {
+                       continue
+               }
+
+               for _, metric := range entity.Metrics {
+                       // Perform cumulative calculation for each statistical 
value.
+                       adder(stats.partitionStats[partitionID], metric.Name, 
metric.Value)
+               }
+       }
 }
diff --git a/collector/main.go b/collector/main.go
index e584d73a4..0c977c17e 100644
--- a/collector/main.go
+++ b/collector/main.go
@@ -18,7 +18,6 @@
 package main
 
 import (
-       "errors"
        "fmt"
        "os"
        "os/signal"
@@ -86,9 +85,10 @@ func main() {
        registry := prometheus.NewRegistry()
        webui.StartWebServer(registry)
 
+       // TODO(wangdan): consider replacing tomb since it has not been 
released since 2017.
        tom := &tomb.Tomb{}
        setupSignalHandler(func() {
-               tom.Kill(errors.New("Collector terminates")) // kill other 
goroutines
+               tom.Kill(nil) // kill other goroutines
        })
 
        tom.Go(func() error {
@@ -105,15 +105,17 @@ func main() {
        })
 
        tom.Go(func() error {
-               conf := hotspot.PartitionDetectorConfig{
-                       DetectInterval: 
viper.GetDuration("hotspot.partition_detect_interval"),
+               partitionDetector, err := 
hotspot.NewPartitionDetector(hotspot.LoadPartitionDetectorConfig())
+               if err != nil {
+                       log.Fatalf("failed to create partition detector for 
hotspot: %v", err)
                }
-               return hotspot.NewPartitionDetector(conf).Run(tom)
+
+               return partitionDetector.Run(tom)
        })
 
        err := tom.Wait()
        if err != nil {
-               log.Error("Collector exited abnormally:", err)
+               log.Error("Collector exited abnormally: ", err)
                return
        }
 
diff --git a/collector/metrics/metric_client.go 
b/collector/metrics/metric_client.go
new file mode 100644
index 000000000..be1db6370
--- /dev/null
+++ b/collector/metrics/metric_client.go
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "net"
+       "net/http"
+       "net/url"
+       "strconv"
+)
+
+// MetricClient encapsulates the APIs that are used to pull metrics from a 
specified target node.
+type MetricClient interface {
+       // GetBriefValueSnapshot pulls the metrics according to the query 
defined by filter.
+       GetBriefValueSnapshot(ctx context.Context, filter 
MetricBriefValueFilter) (*MetricQueryBriefValueSnapshot, error)
+}
+
+type MetricClientConfig struct {
+       Host string // The host of the target node
+       Port uint16 // The port of the target node
+}
+
+func NewMetricClient(cfg *MetricClientConfig) MetricClient {
+       return &metricClientImpl{
+               host:   net.JoinHostPort(cfg.Host, strconv.Itoa(int(cfg.Port))),
+               client: &http.Client{},
+       }
+}
+
+type metricClientImpl struct {
+       host   string // The address(<HOST>:<PORT>) of the target node
+       client *http.Client
+}
+
+func (c *metricClientImpl) GetBriefValueSnapshot(ctx context.Context, filter 
MetricBriefValueFilter) (*MetricQueryBriefValueSnapshot, error) {
+       u := url.URL{
+               Scheme:   "http",
+               Host:     c.host,
+               Path:     MetricQueryPath,
+               RawQuery: filter.ToQueryFields().Encode(), // Generate query 
string
+       }
+
+       // Create the http request to the target node.
+       req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), 
nil)
+       if err != nil {
+               return nil, err
+       }
+
+       // Send the http request and get the response.
+       resp, err := c.client.Do(req)
+       if err != nil {
+               return nil, err
+       }
+
+       defer resp.Body.Close()
+
+       // Fail the request if the status is not OK.
+       if resp.StatusCode != http.StatusOK {
+               return nil, fmt.Errorf("bad status: %s", resp.Status)
+       }
+
+       // Decode the response as the desired object.
+       var result MetricQueryBriefValueSnapshot
+       if err = json.NewDecoder(resp.Body).Decode(&result); err != nil {
+               return nil, err
+       }
+
+       return &result, err
+}
diff --git a/collector/metrics/metric_field.go 
b/collector/metrics/metric_field.go
new file mode 100644
index 000000000..6e7c53471
--- /dev/null
+++ b/collector/metrics/metric_field.go
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+const (
+       MetricQueryPath             string = "/metrics"
+       MetricQueryWithMetricFields string = "with_metric_fields"
+       MetricQueryTypes            string = "types"
+       MetricQueryIDs              string = "ids"
+       MetricQueryAttributes       string = "attributes"
+       MetricQueryMetrics          string = "metrics"
+
+       MetricRegistryFieldCluster     string = "cluster"
+       MetricRegistryFieldRole        string = "role"
+       MetricRegistryFieldHost        string = "host"
+       MetricRegistryFieldPort        string = "port"
+       MetricRegistryFieldTimestampNs string = "timestamp_ns"
+       MetricRegistryFieldEntities    string = "entities"
+
+       MetricEntityFieldType    string = "type"
+       MetricEntityFieldId      string = "id"
+       MetricEntityFieldAttrs   string = "attributes"
+       MetricEntityFieldMetrics string = "metrics"
+
+       MetricEntityTypeReplica string = "replica"
+       MetricEntityTableID     string = "table_id"
+       MetricEntityPartitionID string = "partition_id"
+
+       MetricFieldType        string = "type"
+       MetricFieldName        string = "name"
+       MetricFieldUnit        string = "unit"
+       MetricFieldDesc        string = "desc"
+       MetricFieldSingleValue string = "value"
+
+       MetricReplicaGetRequests      string = "get_requests"
+       MetricReplicaMultiGetRequests string = "multi_get_requests"
+       MetricReplicaBatchGetRequests string = "batch_get_requests"
+       MetricReplicaScanRequests     string = "scan_requests"
+
+       MetricReplicaPutRequests            string = "put_requests"
+       MetricReplicaMultiPutRequests       string = "multi_put_requests"
+       MetricReplicaRemoveRequests         string = "remove_requests"
+       MetricReplicaMultiRemoveRequests    string = "multi_remove_requests"
+       MetricReplicaIncrRequests           string = "incr_requests"
+       MetricReplicaCheckAndSetRequests    string = "check_and_set_requests"
+       MetricReplicaCheckAndMutateRequests string = "check_and_mutate_requests"
+       MetricReplicaDupRequests            string = "dup_requests"
+)
diff --git a/collector/metrics/metric_filter.go 
b/collector/metrics/metric_filter.go
new file mode 100644
index 000000000..3c423c462
--- /dev/null
+++ b/collector/metrics/metric_filter.go
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+       "net/url"
+       "strings"
+)
+
+// MetricBriefValueFilter is used to set each field of the query for "Brief 
Value" (i.e.
+// asking for only 2 fields for each metric: "name" and "value"), and generate 
query string
+// of the http request to the target node. Also see *BriefValue* structures in 
metric_snapshot.go.
+type MetricBriefValueFilter interface {
+       // ToQueryFields generates the query string according to the required 
fields.
+       ToQueryFields() url.Values
+}
+
+func NewMetricBriefValueFilter(
+       entityTypes []string,
+       entityIDs []string,
+       entityAttributes map[string]string,
+       entityMetrics []string,
+) MetricBriefValueFilter {
+       return &metricFilter{
+               WithMetricFields: []string{MetricFieldName, 
MetricFieldSingleValue},
+               EntityTypes:      entityTypes,
+               EntityIDs:        entityIDs,
+               EntityAttributes: entityAttributes,
+               EntityMetrics:    entityMetrics,
+       }
+}
+
+type metricFilter struct {
+       WithMetricFields []string
+       EntityTypes      []string
+       EntityIDs        []string
+       EntityAttributes map[string]string
+       EntityMetrics    []string
+}
+
+func (filter *metricFilter) ToQueryFields() url.Values {
+       fields := make(url.Values)
+
+       fields.Set(MetricQueryWithMetricFields, 
joinValues(filter.WithMetricFields))
+       fields.Set(MetricQueryTypes, joinValues(filter.EntityTypes))
+       fields.Set(MetricQueryIDs, joinValues(filter.EntityIDs))
+
+       // Join all the attributes in a map into one string as the value of the 
attribute field:
+       // "key0,val0,key1,val1,key2,val2,..."
+       attrs := make([]string, 0, len(filter.EntityAttributes)*2)
+       for attrKey, attrValue := range filter.EntityAttributes {
+               attrs = append(attrs, attrKey, attrValue)
+       }
+       fields.Set(MetricQueryAttributes, joinValues(attrs))
+
+       fields.Set(MetricQueryMetrics, joinValues(filter.EntityMetrics))
+
+       return fields
+}
+
+// Use comma to join all elements of a slice as the value of a field in a 
query.
+func joinValues(values []string) string {
+       return strings.Join(values, ",")
+}
diff --git a/collector/metrics/metric_snapshot.go 
b/collector/metrics/metric_snapshot.go
new file mode 100644
index 000000000..ef08f3881
--- /dev/null
+++ b/collector/metrics/metric_snapshot.go
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+type MetricQueryBaseInfo struct {
+       Cluster     string `json:"cluster"`
+       Role        string `json:"role"`
+       Host        string `json:"host"`
+       Port        int    `json:"port"`
+       TimestampNS int64  `json:"timestamp_ns"`
+}
+
+// All the *BriefValue* structures are used as the target objects for the http 
requests
+// that ask for only 2 fields of each metric: "name" and "value".
+
+type MetricQueryBriefValueSnapshot struct {
+       MetricQueryBaseInfo
+       Entities []MetricEntityBriefValueSnapshot `json:"entities"`
+}
+
+type MetricEntityBriefValueSnapshot struct {
+       Type       string                     `json:"type"`
+       ID         string                     `json:"id"`
+       Attributes map[string]string          `json:"attributes"`
+       Metrics    []MetricBriefValueSnapshot `json:"metrics"`
+}
+
+type MetricBriefValueSnapshot struct {
+       Name  string  `json:"name"`
+       Value float64 `json:"value"`
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to