This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 9b4c8cfb3 feat(collector): Go Collector fetches and aggregates
hotspot-related metrics (#2357)
9b4c8cfb3 is described below
commit 9b4c8cfb3ecac5fd272b277c591116d23dfcb251
Author: Dan Wang <[email protected]>
AuthorDate: Thu Jan 29 21:37:38 2026 +0800
feat(collector): Go Collector fetches and aggregates hotspot-related
metrics (#2357)
https://github.com/apache/incubator-pegasus/issues/2358.
Periodically pull read/write related metrics from each replica server node
to calculate traffic, in order to detect whether hotspots exist on each
partition
of each table.
---
collector/config.yml | 5 +-
collector/go.mod | 7 +-
collector/go.sum | 12 +-
collector/hotspot/partition_detector.go | 310 +++++++++++++++++++++++++++++++-
collector/main.go | 14 +-
collector/metrics/metric_client.go | 87 +++++++++
collector/metrics/metric_field.go | 63 +++++++
collector/metrics/metric_filter.go | 79 ++++++++
collector/metrics/metric_snapshot.go | 46 +++++
9 files changed, 598 insertions(+), 25 deletions(-)
diff --git a/collector/config.yml b/collector/config.yml
index 152033608..cfd153f1d 100644
--- a/collector/config.yml
+++ b/collector/config.yml
@@ -48,4 +48,7 @@ availability_detect:
max_replica_count : 3
hotspot:
- partition_detect_interval : 10s
+ rpc_timeout : 5s
+ partition_detect_interval : 30s
+ pull_metrics_timeout : 5s
+ sample_metrics_interval : 10s
diff --git a/collector/go.mod b/collector/go.mod
index b86d0f6ed..6b6e2fa69 100644
--- a/collector/go.mod
+++ b/collector/go.mod
@@ -20,13 +20,14 @@ module github.com/apache/incubator-pegasus/collector
go 1.18
require (
- github.com/apache/incubator-pegasus/go-client
v0.0.0-20251112031012-5eb1665e0630
+ github.com/apache/incubator-pegasus/go-client
v0.0.0-20260121121155-96868ed93b2a
github.com/kataras/iris/v12 v12.2.0
github.com/prometheus/client_golang v1.18.0
github.com/sirupsen/logrus v1.8.1
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.8.2
github.com/tidwall/gjson v1.14.0
+ golang.org/x/sync v0.12.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
k8s.io/apimachinery v0.16.13
@@ -98,3 +99,7 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
+
+// TODO(wangdan): Go 1.18 does not support golang.org/x/sync v0.12.0+ which
depend on
+// context.WithCancelCause requiring Go 1.23.
+replace golang.org/x/sync => golang.org/x/sync v0.11.0
diff --git a/collector/go.sum b/collector/go.sum
index fe5ee0e3e..dfb81a2c4 100644
--- a/collector/go.sum
+++ b/collector/go.sum
@@ -35,8 +35,8 @@ github.com/alecthomas/template
v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod
h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/andybalholm/brotli v1.0.5
h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod
h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
-github.com/apache/incubator-pegasus/go-client
v0.0.0-20251112031012-5eb1665e0630
h1:W+pNxPZKNEBxx+G2V7KXuWpEXSyMPIHWJgLXiEQB4Uk=
-github.com/apache/incubator-pegasus/go-client
v0.0.0-20251112031012-5eb1665e0630/go.mod
h1:SQnz/3Qg6uH1tfl3MKmiYwNk+i5CZiMD9AtMOTZkpgw=
+github.com/apache/incubator-pegasus/go-client
v0.0.0-20260121121155-96868ed93b2a
h1:Vqws5uoQ/ibw4QcnDHdXIleiGunC1QmZaMCrJN0znEk=
+github.com/apache/incubator-pegasus/go-client
v0.0.0-20260121121155-96868ed93b2a/go.mod
h1:SQnz/3Qg6uH1tfl3MKmiYwNk+i5CZiMD9AtMOTZkpgw=
github.com/apache/thrift v0.13.0
h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI=
github.com/apache/thrift v0.13.0/go.mod
h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod
h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
@@ -396,12 +396,8 @@ golang.org/x/net v0.38.0/go.mod
h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod
h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod
h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
-golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
+golang.org/x/sync v0.11.0/go.mod
h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
diff --git a/collector/hotspot/partition_detector.go
b/collector/hotspot/partition_detector.go
index a1b55ef5a..5238aa459 100644
--- a/collector/hotspot/partition_detector.go
+++ b/collector/hotspot/partition_detector.go
@@ -18,9 +18,18 @@
package hotspot
import (
+ "context"
+ "fmt"
+ "strconv"
"time"
+ "github.com/apache/incubator-pegasus/collector/metrics"
+ client "github.com/apache/incubator-pegasus/go-client/admin"
+ "github.com/apache/incubator-pegasus/go-client/idl/admin"
+ "github.com/apache/incubator-pegasus/go-client/idl/replication"
log "github.com/sirupsen/logrus"
+ "github.com/spf13/viper"
+ "golang.org/x/sync/errgroup"
"gopkg.in/tomb.v2"
)
@@ -29,23 +38,61 @@ type PartitionDetector interface {
}
type PartitionDetectorConfig struct {
- DetectInterval time.Duration
+ MetaServers []string
+ RpcTimeout time.Duration
+ DetectInterval time.Duration
+ PullMetricsTimeout time.Duration
+ SampleMetricsInterval time.Duration
}
-func NewPartitionDetector(conf PartitionDetectorConfig) PartitionDetector {
- return &partitionDetector{
- detectInterval: conf.DetectInterval,
+func LoadPartitionDetectorConfig() *PartitionDetectorConfig {
+ return &PartitionDetectorConfig{
+ MetaServers: viper.GetStringSlice("meta_servers"),
+ RpcTimeout: viper.GetDuration("hotspot.rpc_timeout"),
+ DetectInterval:
viper.GetDuration("hotspot.partition_detect_interval"),
+ PullMetricsTimeout:
viper.GetDuration("hotspot.pull_metrics_timeout"),
+ SampleMetricsInterval:
viper.GetDuration("hotspot.sample_metrics_interval"),
}
}
-type partitionDetector struct {
- detectInterval time.Duration
+func NewPartitionDetector(cfg *PartitionDetectorConfig) (PartitionDetector,
error) {
+ if len(cfg.MetaServers) == 0 {
+ return nil, fmt.Errorf("MetaServers should not be empty")
+ }
+
+ if cfg.DetectInterval <= 0 {
+ return nil, fmt.Errorf("DetectInterval(%d) must be > 0",
cfg.DetectInterval)
+ }
+
+ if cfg.PullMetricsTimeout <= 0 {
+ return nil, fmt.Errorf("PullMetricsTimeout(%d) must be > 0",
cfg.PullMetricsTimeout)
+ }
+
+ if cfg.SampleMetricsInterval <= 0 {
+ return nil, fmt.Errorf("SampleMetricsInterval(%d) must be > 0",
cfg.SampleMetricsInterval)
+ }
+
+ if cfg.DetectInterval <= cfg.SampleMetricsInterval {
+ return nil, fmt.Errorf("DetectInterval(%d) must be >
SampleMetricsInterval(%d)",
+ cfg.DetectInterval, cfg.SampleMetricsInterval)
+ }
+
+ return &partitionDetectorImpl{
+ cfg: cfg,
+ }, nil
+}
+
+type partitionDetectorImpl struct {
+ cfg *PartitionDetectorConfig
}
-func (d *partitionDetector) Run(tom *tomb.Tomb) error {
+func (d *partitionDetectorImpl) Run(tom *tomb.Tomb) error {
+ ticker := time.NewTicker(d.cfg.DetectInterval)
+ defer ticker.Stop()
+
for {
select {
- case <-time.After(d.detectInterval):
+ case <-ticker.C:
d.detect()
case <-tom.Dying():
log.Info("Hotspot partition detector exited.")
@@ -54,5 +101,250 @@ func (d *partitionDetector) Run(tom *tomb.Tomb) error {
}
}
-func (d *partitionDetector) detect() {
+func (d *partitionDetectorImpl) detect() {
+ err := d.aggregate()
+ if err != nil {
+ log.Error("failed to aggregate metrics for hotspot: ", err)
+ }
+}
+
+// {appID -> appStats}.
+type appStatsMap map[int32]appStats
+
+type appStats struct {
+ appName string
+ partitionCount int32
+ partitionConfigs []*replication.PartitionConfiguration
+ partitionStats []map[string]float64 // {metric_name -> metric_value}
for each partition.
+}
+
+func (d *partitionDetectorImpl) aggregate() error {
+ adminClient := client.NewClient(client.Config{
+ MetaServers: d.cfg.MetaServers,
+ Timeout: d.cfg.RpcTimeout,
+ })
+ defer adminClient.Close()
+
+ // appMap is the final structure that includes all the statistical
values.
+ appMap, err := pullTablePartitions(adminClient)
+ if err != nil {
+ return err
+ }
+
+ err = d.aggregateMetrics(adminClient, appMap)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// Pull metadata of all available tables with all their partitions and form
the final structure
+// that includes all the statistical values.
+func pullTablePartitions(adminClient client.Client) (appStatsMap, error) {
+ tables, err := adminClient.ListTables()
+ if err != nil {
+ return nil, err
+ }
+
+ appMap := make(appStatsMap)
+ for _, table := range tables {
+ // Query metadata for each partition of each table.
+ appID, partitionCount, partitionConfigs, err :=
adminClient.QueryConfig(table.AppName)
+ if err != nil {
+ return nil, err
+ }
+
+ // Initialize statistical value for each partition.
+ partitionStats := make([]map[string]float64, 0,
len(partitionConfigs))
+ for range partitionConfigs {
+ m := make(map[string]float64)
+ partitionStats = append(partitionStats, m)
+ }
+
+ appMap[appID] = appStats{
+ appName: table.AppName,
+ partitionCount: partitionCount,
+ partitionConfigs: partitionConfigs,
+ partitionStats: partitionStats,
+ }
+ }
+
+ return appMap, nil
+}
+
+type aggregator func(map[string]float64, string, float64)
+
+// Pull metrics from all nodes and aggregate them to produce the statistics.
+func (d *partitionDetectorImpl) aggregateMetrics(adminClient client.Client,
appMap appStatsMap) error {
+ nodes, err := adminClient.ListNodes()
+ if err != nil {
+ return err
+ }
+
+ // Pull multiple results of metrics to perform cumulative calculation
to produce the
+ // statistics such as QPS.
+ startSnapshots, err := d.pullMetrics(nodes)
+ if err != nil {
+ return err
+ }
+
+ time.Sleep(d.cfg.SampleMetricsInterval)
+
+ endSnapshots, err := d.pullMetrics(nodes)
+ if err != nil {
+ return err
+ }
+
+ for i, snapshot := range endSnapshots {
+ if snapshot.TimestampNS <= startSnapshots[i].TimestampNS {
+ return fmt.Errorf("end timestamp (%d) must be greater
than start timestamp (%d)",
+ snapshot.TimestampNS,
startSnapshots[i].TimestampNS)
+ }
+
+ d.calculateStats(snapshot, nodes[i],
+ func(stats map[string]float64, key string, operand
float64) {
+ // Just set the ending number of requests.
+ stats[key] = operand
+ },
+ appMap)
+ }
+
+ for i, snapshot := range startSnapshots {
+ d.calculateStats(snapshot, nodes[i],
+ func(duration time.Duration) aggregator {
+ return func(stats map[string]float64, key
string, operand float64) {
+ value, ok := stats[key]
+ if !ok || value < operand {
+ stats[key] = 0
+ return
+ }
+
+ // Calculate QPS based on the ending
number of requests that have been
+ // set previously.
+ stats[key] = (value - operand) /
duration.Seconds()
+ }
+
}(time.Duration(endSnapshots[i].TimestampNS-snapshot.TimestampNS)),
+ appMap)
+ }
+
+ return nil
+}
+
+var (
+ readMetricNames = []string{
+ metrics.MetricReplicaGetRequests,
+ metrics.MetricReplicaMultiGetRequests,
+ metrics.MetricReplicaBatchGetRequests,
+ metrics.MetricReplicaScanRequests,
+ }
+
+ writeMetricNames = []string{
+ metrics.MetricReplicaPutRequests,
+ metrics.MetricReplicaMultiGetRequests,
+ metrics.MetricReplicaRemoveRequests,
+ metrics.MetricReplicaMultiRemoveRequests,
+ metrics.MetricReplicaIncrRequests,
+ metrics.MetricReplicaCheckAndSetRequests,
+ metrics.MetricReplicaCheckAndMutateRequests,
+ metrics.MetricReplicaDupRequests,
+ }
+
+ metricFilter = metrics.NewMetricBriefValueFilter(
+ []string{metrics.MetricEntityTypeReplica},
+ []string{},
+ map[string]string{},
+ append(append([]string(nil), readMetricNames...),
writeMetricNames...),
+ )
+)
+
+func (d *partitionDetectorImpl) pullMetrics(nodes []*admin.NodeInfo)
([]*metrics.MetricQueryBriefValueSnapshot, error) {
+ results := make([]*metrics.MetricQueryBriefValueSnapshot, len(nodes))
+
+ ctx, cancel := context.WithTimeout(context.Background(),
d.cfg.PullMetricsTimeout)
+ defer cancel()
+
+ // Pull the metrics simultaneously from all nodes.
+ eg, ctx := errgroup.WithContext(ctx)
+ puller := func(index int, node *admin.NodeInfo) func() error {
+ return func() error {
+ // Create a client for each target node.
+ metricClient :=
metrics.NewMetricClient(&metrics.MetricClientConfig{
+ Host: node.HpNode.GetHost(),
+ Port: node.HpNode.GetPort(),
+ })
+
+ // Pull the metrics from the target node.
+ snapshot, err :=
metricClient.GetBriefValueSnapshot(ctx, metricFilter)
+ if err != nil {
+ return err
+ }
+
+ // Place the pulled result into the position in the
slice that correspond to
+ // the target node.
+ results[index] = snapshot
+ return nil
+ }
+ }
+
+ for i, node := range nodes {
+ // Launch one Go routine for each target node to pull metrics
from it.
+ eg.Go(puller(i, node))
+ }
+
+ // Wait all requests to be finished.
+ if err := eg.Wait(); err != nil {
+ return nil, err
+ }
+
+ return results, nil
+}
+
+func (d *partitionDetectorImpl) calculateStats(
+ snapshot *metrics.MetricQueryBriefValueSnapshot,
+ node *admin.NodeInfo,
+ adder aggregator,
+ appMap appStatsMap) {
+ for _, entity := range snapshot.Entities {
+ // The metric must belong to the entity of "replica".
+ if entity.Type != metrics.MetricEntityTypeReplica {
+ continue
+ }
+
+ // The metric must have valid table id.
+ appID, err :=
strconv.Atoi(entity.Attributes[metrics.MetricEntityTableID])
+ if err != nil {
+ continue
+ }
+
+ // The table must exist in the returned metadata, which means
it is available.
+ stats, ok := appMap[int32(appID)]
+ if !ok {
+ continue
+ }
+
+ // The metric must have valid partition id.
+ partitionID, err :=
strconv.Atoi(entity.Attributes[metrics.MetricEntityPartitionID])
+ if err != nil {
+ continue
+ }
+
+ // The partition id should be less than the number of
partitions.
+ if partitionID >= len(stats.partitionConfigs) {
+ continue
+ }
+
+ // Only primary replica of a partition will be counted.
+ // TODO(wangdan): support Equal() for base.HostPort.
+ primary := stats.partitionConfigs[partitionID].HpPrimary
+ if primary.GetHost() != node.HpNode.GetHost() ||
+ primary.GetPort() != node.HpNode.GetPort() {
+ continue
+ }
+
+ for _, metric := range entity.Metrics {
+ // Perform cumulative calculation for each statistical
value.
+ adder(stats.partitionStats[partitionID], metric.Name,
metric.Value)
+ }
+ }
}
diff --git a/collector/main.go b/collector/main.go
index e584d73a4..0c977c17e 100644
--- a/collector/main.go
+++ b/collector/main.go
@@ -18,7 +18,6 @@
package main
import (
- "errors"
"fmt"
"os"
"os/signal"
@@ -86,9 +85,10 @@ func main() {
registry := prometheus.NewRegistry()
webui.StartWebServer(registry)
+ // TODO(wangdan): consider replacing tomb since it has not been
released since 2017.
tom := &tomb.Tomb{}
setupSignalHandler(func() {
- tom.Kill(errors.New("Collector terminates")) // kill other
goroutines
+ tom.Kill(nil) // kill other goroutines
})
tom.Go(func() error {
@@ -105,15 +105,17 @@ func main() {
})
tom.Go(func() error {
- conf := hotspot.PartitionDetectorConfig{
- DetectInterval:
viper.GetDuration("hotspot.partition_detect_interval"),
+ partitionDetector, err :=
hotspot.NewPartitionDetector(hotspot.LoadPartitionDetectorConfig())
+ if err != nil {
+ log.Fatalf("failed to create partition detector for
hotspot: %v", err)
}
- return hotspot.NewPartitionDetector(conf).Run(tom)
+
+ return partitionDetector.Run(tom)
})
err := tom.Wait()
if err != nil {
- log.Error("Collector exited abnormally:", err)
+ log.Error("Collector exited abnormally: ", err)
return
}
diff --git a/collector/metrics/metric_client.go
b/collector/metrics/metric_client.go
new file mode 100644
index 000000000..be1db6370
--- /dev/null
+++ b/collector/metrics/metric_client.go
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "net"
+ "net/http"
+ "net/url"
+ "strconv"
+)
+
+// MetricClient encapsulates the APIs that are used to pull metrics from a
specified target node.
+type MetricClient interface {
+ // GetBriefValueSnapshot pulls the metrics according to the query
defined by filter.
+ GetBriefValueSnapshot(ctx context.Context, filter
MetricBriefValueFilter) (*MetricQueryBriefValueSnapshot, error)
+}
+
+type MetricClientConfig struct {
+ Host string // The host of the target node
+ Port uint16 // The port of the target node
+}
+
+func NewMetricClient(cfg *MetricClientConfig) MetricClient {
+ return &metricClientImpl{
+ host: net.JoinHostPort(cfg.Host, strconv.Itoa(int(cfg.Port))),
+ client: &http.Client{},
+ }
+}
+
+type metricClientImpl struct {
+ host string // The address(<HOST>:<PORT>) of the target node
+ client *http.Client
+}
+
+func (c *metricClientImpl) GetBriefValueSnapshot(ctx context.Context, filter
MetricBriefValueFilter) (*MetricQueryBriefValueSnapshot, error) {
+ u := url.URL{
+ Scheme: "http",
+ Host: c.host,
+ Path: MetricQueryPath,
+ RawQuery: filter.ToQueryFields().Encode(), // Generate query
string
+ }
+
+ // Create the http request to the target node.
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(),
nil)
+ if err != nil {
+ return nil, err
+ }
+
+ // Send the http request and get the response.
+ resp, err := c.client.Do(req)
+ if err != nil {
+ return nil, err
+ }
+
+ defer resp.Body.Close()
+
+ // Fail the request if the status is not OK.
+ if resp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("bad status: %s", resp.Status)
+ }
+
+ // Decode the response as the desired object.
+ var result MetricQueryBriefValueSnapshot
+ if err = json.NewDecoder(resp.Body).Decode(&result); err != nil {
+ return nil, err
+ }
+
+ return &result, err
+}
diff --git a/collector/metrics/metric_field.go
b/collector/metrics/metric_field.go
new file mode 100644
index 000000000..6e7c53471
--- /dev/null
+++ b/collector/metrics/metric_field.go
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+const (
+ MetricQueryPath string = "/metrics"
+ MetricQueryWithMetricFields string = "with_metric_fields"
+ MetricQueryTypes string = "types"
+ MetricQueryIDs string = "ids"
+ MetricQueryAttributes string = "attributes"
+ MetricQueryMetrics string = "metrics"
+
+ MetricRegistryFieldCluster string = "cluster"
+ MetricRegistryFieldRole string = "role"
+ MetricRegistryFieldHost string = "host"
+ MetricRegistryFieldPort string = "port"
+ MetricRegistryFieldTimestampNs string = "timestamp_ns"
+ MetricRegistryFieldEntities string = "entities"
+
+ MetricEntityFieldType string = "type"
+ MetricEntityFieldId string = "id"
+ MetricEntityFieldAttrs string = "attributes"
+ MetricEntityFieldMetrics string = "metrics"
+
+ MetricEntityTypeReplica string = "replica"
+ MetricEntityTableID string = "table_id"
+ MetricEntityPartitionID string = "partition_id"
+
+ MetricFieldType string = "type"
+ MetricFieldName string = "name"
+ MetricFieldUnit string = "unit"
+ MetricFieldDesc string = "desc"
+ MetricFieldSingleValue string = "value"
+
+ MetricReplicaGetRequests string = "get_requests"
+ MetricReplicaMultiGetRequests string = "multi_get_requests"
+ MetricReplicaBatchGetRequests string = "batch_get_requests"
+ MetricReplicaScanRequests string = "scan_requests"
+
+ MetricReplicaPutRequests string = "put_requests"
+ MetricReplicaMultiPutRequests string = "multi_put_requests"
+ MetricReplicaRemoveRequests string = "remove_requests"
+ MetricReplicaMultiRemoveRequests string = "multi_remove_requests"
+ MetricReplicaIncrRequests string = "incr_requests"
+ MetricReplicaCheckAndSetRequests string = "check_and_set_requests"
+ MetricReplicaCheckAndMutateRequests string = "check_and_mutate_requests"
+ MetricReplicaDupRequests string = "dup_requests"
+)
diff --git a/collector/metrics/metric_filter.go
b/collector/metrics/metric_filter.go
new file mode 100644
index 000000000..3c423c462
--- /dev/null
+++ b/collector/metrics/metric_filter.go
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+import (
+ "net/url"
+ "strings"
+)
+
+// MetricBriefValueFilter is used to set each field of the query for "Brief
Value" (i.e.
+// asking for only 2 fields for each metric: "name" and "value"), and generate
query string
+// of the http request to the target node. Also see *BriefValue* structures in
metric_snapshot.go.
+type MetricBriefValueFilter interface {
+ // ToQueryFields generates the query string according to the required
fields.
+ ToQueryFields() url.Values
+}
+
+func NewMetricBriefValueFilter(
+ entityTypes []string,
+ entityIDs []string,
+ entityAttributes map[string]string,
+ entityMetrics []string,
+) MetricBriefValueFilter {
+ return &metricFilter{
+ WithMetricFields: []string{MetricFieldName,
MetricFieldSingleValue},
+ EntityTypes: entityTypes,
+ EntityIDs: entityIDs,
+ EntityAttributes: entityAttributes,
+ EntityMetrics: entityMetrics,
+ }
+}
+
+type metricFilter struct {
+ WithMetricFields []string
+ EntityTypes []string
+ EntityIDs []string
+ EntityAttributes map[string]string
+ EntityMetrics []string
+}
+
+func (filter *metricFilter) ToQueryFields() url.Values {
+ fields := make(url.Values)
+
+ fields.Set(MetricQueryWithMetricFields,
joinValues(filter.WithMetricFields))
+ fields.Set(MetricQueryTypes, joinValues(filter.EntityTypes))
+ fields.Set(MetricQueryIDs, joinValues(filter.EntityIDs))
+
+ // Join all the attributes in a map into one string as the value of the
attribute field:
+ // "key0,val0,key1,val1,key2,val2,..."
+ attrs := make([]string, 0, len(filter.EntityAttributes)*2)
+ for attrKey, attrValue := range filter.EntityAttributes {
+ attrs = append(attrs, attrKey, attrValue)
+ }
+ fields.Set(MetricQueryAttributes, joinValues(attrs))
+
+ fields.Set(MetricQueryMetrics, joinValues(filter.EntityMetrics))
+
+ return fields
+}
+
+// Use comma to join all elements of a slice as the value of a field in a
query.
+func joinValues(values []string) string {
+ return strings.Join(values, ",")
+}
diff --git a/collector/metrics/metric_snapshot.go
b/collector/metrics/metric_snapshot.go
new file mode 100644
index 000000000..ef08f3881
--- /dev/null
+++ b/collector/metrics/metric_snapshot.go
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package metrics
+
+type MetricQueryBaseInfo struct {
+ Cluster string `json:"cluster"`
+ Role string `json:"role"`
+ Host string `json:"host"`
+ Port int `json:"port"`
+ TimestampNS int64 `json:"timestamp_ns"`
+}
+
+// All the *BriefValue* structures are used as the target objects for the http
requests
+// that ask for only 2 fields of each metric: "name" and "value".
+
+type MetricQueryBriefValueSnapshot struct {
+ MetricQueryBaseInfo
+ Entities []MetricEntityBriefValueSnapshot `json:"entities"`
+}
+
+type MetricEntityBriefValueSnapshot struct {
+ Type string `json:"type"`
+ ID string `json:"id"`
+ Attributes map[string]string `json:"attributes"`
+ Metrics []MetricBriefValueSnapshot `json:"metrics"`
+}
+
+type MetricBriefValueSnapshot struct {
+ Name string `json:"name"`
+ Value float64 `json:"value"`
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]