This is an automated email from the ASF dual-hosted git repository.
wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
The following commit(s) were added to refs/heads/master by this push:
new 89b7c33b2 feat(go-client): introduce Prometheus for go-client
monitoring and added multiple metric collection sites (#2285)
89b7c33b2 is described below
commit 89b7c33b2d387b3cd28af05a9b916869b529d939
Author: nanorth <[email protected]>
AuthorDate: Mon Nov 10 15:23:36 2025 +0800
feat(go-client): introduce Prometheus for go-client monitoring and added
multiple metric collection sites (#2285)
### What problem does this PR solve? <!--add issue link with summary if
exists-->
This PR introduces Prometheus for go-client monitoring, and adds multiple
metric collection points to enhance observability of the system.
### What is changed and how does it work?
This PR integrates the Prometheus client libraries
(github.com/prometheus/client_golang and github.com/prometheus/client_model) as
tools for metric instrumentation and data reporting.
### Checklist <!--REMOVE the items that are not applicable-->
##### Tests <!-- At least one of them must be included. -->
- Manual test (add detailed scripts or steps below)
Use go-client to invoke several interfaces, and the metrics were verified
to be correctly reported on Prometheus monitoring services.
<img width="1664" height="562" alt="image"
src="https://github.com/user-attachments/assets/5976457b-e8f4-4225-b53f-1d07b0f358da"
/>
##### Code changes
- Has exported function/method change
Added metric collection for operation counts and latencies in `func (p
*pegasusTableConnector) runPartitionOp`.
Added RPC request size metric collection in `func (n *nodeSession)
writeRequest`.
- Has exported variable/fields change
Added a new field `enablePerfCounter` in `nodeSession` to distinguish
between meta and replica nodes. Performance metrics are collected only for
replica nodes. The constructor has been updated accordingly.
- Package structure change
The `Config` struct has been moved into a separate package to avoid
circular dependencies.
##### Related changes
- Need to update the documentation
Since the `Config` struct has been moved to a separate package, the example
code has also been updated to reflect this change.
---
go-client/admin/client.go | 3 +-
go-client/admin/client_test.go | 3 +-
go-client/admin/remote_cmd_client.go | 2 +-
go-client/config/config.go | 62 +++++++++++++
go-client/example/full_scan.md | 131 ++++++++++++++-------------
go-client/example/main.go | 3 +-
go-client/example/pegasus-client-config.json | 8 +-
go-client/go.mod | 14 ++-
go-client/go.sum | 34 +++++--
go-client/integration/failover-test/main.go | 3 +-
go-client/metrics/metrics.go | 97 ++++++++++++++++++++
go-client/pegasus/client.go | 24 +++--
go-client/pegasus/client_test.go | 15 +--
go-client/pegasus/config.go | 25 -----
go-client/pegasus/table_connector.go | 49 ++++++++--
go-client/pegasus/table_connector_test.go | 4 +-
go-client/session/meta_call.go | 2 +-
go-client/session/meta_session.go | 6 +-
go-client/session/replica_session.go | 13 ++-
go-client/session/session.go | 39 +++++---
go-client/session/session_test.go | 20 ++--
21 files changed, 395 insertions(+), 162 deletions(-)
diff --git a/go-client/admin/client.go b/go-client/admin/client.go
index d51a0b52a..81e607d5a 100644
--- a/go-client/admin/client.go
+++ b/go-client/admin/client.go
@@ -172,7 +172,8 @@ func (c *rpcBasedClient) CreateTable(tableName string,
partitionCount int32, rep
AppType: "pegasus",
IsStateful: true,
Envs: envs,
- }}
+ },
+ }
var appID int32
var respErr error
diff --git a/go-client/admin/client_test.go b/go-client/admin/client_test.go
index a263c7c5a..d768c025a 100644
--- a/go-client/admin/client_test.go
+++ b/go-client/admin/client_test.go
@@ -25,6 +25,7 @@ import (
"testing"
"time"
+ "github.com/apache/incubator-pegasus/go-client/config"
"github.com/apache/incubator-pegasus/go-client/idl/admin"
"github.com/apache/incubator-pegasus/go-client/idl/replication"
"github.com/apache/incubator-pegasus/go-client/pegasus"
@@ -110,7 +111,7 @@ func TestAdmin_CreateTableMustAvailable(t *testing.T) {
}
// ensures the created table must be available for read and write
- rwClient := pegasus.NewClient(pegasus.Config{
+ rwClient := pegasus.NewClient(config.Config{
MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602",
"0.0.0.0:34603"},
})
defer func() {
diff --git a/go-client/admin/remote_cmd_client.go
b/go-client/admin/remote_cmd_client.go
index 8acf64467..5bedf7faa 100644
--- a/go-client/admin/remote_cmd_client.go
+++ b/go-client/admin/remote_cmd_client.go
@@ -33,7 +33,7 @@ type RemoteCmdClient struct {
// NewRemoteCmdClient returns an instance of RemoteCmdClient.
func NewRemoteCmdClient(addr string, nodeType session.NodeType)
*RemoteCmdClient {
return &RemoteCmdClient{
- session: session.NewNodeSession(addr, nodeType),
+ session: session.NewNodeSession(addr, nodeType,
session.DisableMetrics),
}
}
diff --git a/go-client/config/config.go b/go-client/config/config.go
new file mode 100644
index 000000000..7e82226cd
--- /dev/null
+++ b/go-client/config/config.go
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package config
+
+// Config is the configuration of pegasus client.
+type Config struct {
+ MetaServers []string `json:"meta_servers"`
+ PrometheusConstLabels map[string]string `json:"prometheus_const_labels"`
+ EnablePrometheus bool `json:"enable_prometheus"`
+ PrometheusPort int `json:"prometheus_port"`
+}
+
+type Option func(*Config)
+
+func NewConfig(metaServers []string, opts ...Option) *Config {
+ cfg := &Config{
+ MetaServers: metaServers,
+ PrometheusConstLabels: make(map[string]string),
+ EnablePrometheus: false,
+ PrometheusPort: 9090,
+ }
+
+ for _, opt := range opts {
+ opt(cfg)
+ }
+ return cfg
+}
+
+func WithPerfCounterTags(tags map[string]string) Option {
+ return func(c *Config) {
+ c.PrometheusConstLabels = tags
+ }
+}
+
+func WithEnablePrometheus(enable bool) Option {
+ return func(c *Config) {
+ c.EnablePrometheus = enable
+ }
+}
+
+func WithPrometheusPort(port int) Option {
+ return func(c *Config) {
+ c.PrometheusPort = port
+ }
+}
diff --git a/go-client/example/full_scan.md b/go-client/example/full_scan.md
index 524d53615..20dd617d7 100644
--- a/go-client/example/full_scan.md
+++ b/go-client/example/full_scan.md
@@ -34,79 +34,80 @@ lower than []bytes(oneYearAgoTs).
package main
import (
- "context"
- "encoding/binary"
- "time"
+ "context"
+ "encoding/binary"
+ "time"
- "github.com/apache/incubator-pegasus/go-client/pegalog"
- "github.com/apache/incubator-pegasus/go-client/pegasus"
+ "github.com/apache/incubator-pegasus/go-client/config"
+ "github.com/apache/incubator-pegasus/go-client/pegalog"
+ "github.com/apache/incubator-pegasus/go-client/pegasus"
)
func searchHistoryOneYearAgo() {
- // Customize where the pegasus-go-client's logs reside.
- pegalog.SetLogger(pegalog.NewLogrusLogger(&pegalog.LogrusConfig{
- Filename: "./pegasus.log",
- }))
- logger := pegalog.GetLogger()
+ // Customize where the pegasus-go-client's logs reside.
+ pegalog.SetLogger(pegalog.NewLogrusLogger(&pegalog.LogrusConfig{
+ Filename: "./pegasus.log",
+ }))
+ logger := pegalog.GetLogger()
- // Configure the meta addresses to access the pegasus cluster.
- cfg := &pegasus.Config{
- MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34601"},
- }
- c := pegasus.NewClient(*cfg)
+ // Configure the meta addresses to access the pegasus cluster.
+ cfg := &config.Config{
+ MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34601"},
+ }
+ c := pegasus.NewClient(*cfg)
- // Establish the connections to replica-servers.
- ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
- tb, err := c.OpenTable(ctx, "user_history")
- if err != nil {
- logger.Print(err)
- return
- }
- logger.Print("opened table user_history")
+ // Establish the connections to replica-servers.
+ ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
+ tb, err := c.OpenTable(ctx, "user_history")
+ if err != nil {
+ logger.Print(err)
+ return
+ }
+ logger.Print("opened table user_history")
- // Set up the scanners.
- ctx, _ = context.WithTimeout(context.Background(), time.Second*10)
- sopts := &pegasus.ScannerOptions{
- BatchSize: 20,
- // Values can be optimized out during scanning to reduce the workload.
- NoValue: true,
- }
- scanners, err := tb.GetUnorderedScanners(ctx, 16, sopts)
- if err != nil {
- logger.Print(err)
- }
- logger.Printf("opened %d scanners", len(scanners))
- oneYearAgo := int(time.Now().AddDate(-1, 0, 0).UnixNano() / 1000 / 1000)
- for i, scanner := range scanners {
- // Iterates sequentially.
+ // Set up the scanners.
+ ctx, _ = context.WithTimeout(context.Background(), time.Second*10)
+ sopts := &pegasus.ScannerOptions{
+ BatchSize: 20,
+ // Values can be optimized out during scanning to reduce the
workload.
+ NoValue: true,
+ }
+ scanners, err := tb.GetUnorderedScanners(ctx, 16, sopts)
+ if err != nil {
+ logger.Print(err)
+ }
+ logger.Printf("opened %d scanners", len(scanners))
+ oneYearAgo := int(time.Now().AddDate(-1, 0, 0).UnixNano() / 1000 / 1000)
+ for i, scanner := range scanners {
+ // Iterates sequentially.
- start := time.Now()
- cnt := 0
- for true {
- ctx, _ = context.WithTimeout(context.Background(), time.Second*10)
- completed, hashKey, sortKey, _, err := scanner.Next(ctx)
- if err != nil {
- logger.Print(err)
- return
- }
- if completed {
- logger.Printf("scanner %d completes", i)
- break
- }
- if len(sortKey) == 8 {
- res := int(binary.BigEndian.Uint64(sortKey))
- if res < oneYearAgo {
- logger.Printf("hashkey=%s, sortkey=%d\n", string(hashKey),
res)
- }
- }
+ start := time.Now()
+ cnt := 0
+ for true {
+ ctx, _ = context.WithTimeout(context.Background(),
time.Second*10)
+ completed, hashKey, sortKey, _, err := scanner.Next(ctx)
+ if err != nil {
+ logger.Print(err)
+ return
+ }
+ if completed {
+ logger.Printf("scanner %d completes", i)
+ break
+ }
+ if len(sortKey) == 8 {
+ res := int(binary.BigEndian.Uint64(sortKey))
+ if res < oneYearAgo {
+ logger.Printf("hashkey=%s,
sortkey=%d\n", string(hashKey), res)
+ }
+ }
- cnt++
- if time.Now().Sub(start) > time.Minute {
- logger.Printf("scan 1-min, %d rows in total", cnt)
- start = time.Now()
- }
- }
- }
- logger.Print("program exits")
+ cnt++
+ if time.Now().Sub(start) > time.Minute {
+ logger.Printf("scan 1-min, %d rows in total",
cnt)
+ start = time.Now()
+ }
+ }
+ }
+ logger.Print("program exits")
}
```
diff --git a/go-client/example/main.go b/go-client/example/main.go
index cab1c462e..f81f2d2f8 100644
--- a/go-client/example/main.go
+++ b/go-client/example/main.go
@@ -27,6 +27,7 @@ import (
"path/filepath"
"time"
+ "github.com/apache/incubator-pegasus/go-client/config"
"github.com/apache/incubator-pegasus/go-client/pegalog"
"github.com/apache/incubator-pegasus/go-client/pegasus"
)
@@ -48,7 +49,7 @@ func main() {
}))
logger := pegalog.GetLogger()
- cfg := &pegasus.Config{}
+ cfg := &config.Config{}
json.Unmarshal(rawCfg, cfg)
c := pegasus.NewClient(*cfg)
diff --git a/go-client/example/pegasus-client-config.json
b/go-client/example/pegasus-client-config.json
index 1cf688269..2db1526a7 100644
--- a/go-client/example/pegasus-client-config.json
+++ b/go-client/example/pegasus-client-config.json
@@ -3,5 +3,11 @@
"127.0.0.1:34601",
"127.0.0.1:34602",
"127.0.0.1:34603"
- ]
+ ],
+ "prometheus_const_labels": {
+ "k1": "v1",
+ "k2": "v2"
+ },
+ "enable_prometheus": false,
+ "prometheus_port": 9090
}
\ No newline at end of file
diff --git a/go-client/go.mod b/go-client/go.mod
index 08a1cea1d..911d10045 100644
--- a/go-client/go.mod
+++ b/go-client/go.mod
@@ -24,7 +24,8 @@ require (
github.com/apache/thrift v0.13.0
github.com/cenkalti/backoff/v4 v4.1.0
github.com/fortytw2/leaktest v1.3.0
- github.com/sirupsen/logrus v1.4.2
+ github.com/prometheus/client_golang v1.18.0
+ github.com/sirupsen/logrus v1.6.0
github.com/stretchr/testify v1.4.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
@@ -33,10 +34,17 @@ require (
require (
github.com/BurntSushi/toml v0.3.1 // indirect
+ github.com/beorn7/perks v1.0.1 // indirect
+ github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
- github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect
+ github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
+ github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
+ github.com/prometheus/client_model v0.5.0 // indirect
+ github.com/prometheus/common v0.45.0 // indirect
+ github.com/prometheus/procfs v0.12.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.31.0 // indirect
- gopkg.in/yaml.v2 v2.2.8 // indirect
+ google.golang.org/protobuf v1.32.0 // indirect
+ gopkg.in/yaml.v2 v2.4.0 // indirect
)
diff --git a/go-client/go.sum b/go-client/go.sum
index f4a3321a9..94db4da94 100644
--- a/go-client/go.sum
+++ b/go-client/go.sum
@@ -7,8 +7,12 @@ github.com/agiledragon/gomonkey v2.0.2+incompatible
h1:eXKi9/piiC3cjJD1658mEE2o3
github.com/agiledragon/gomonkey v2.0.2+incompatible/go.mod
h1:2NGfXu1a80LLr2cmWXGBDaHEjb1idR6+FVlX5T3D9hw=
github.com/apache/thrift v0.13.0
h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI=
github.com/apache/thrift v0.13.0/go.mod
h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
+github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
+github.com/beorn7/perks v1.0.1/go.mod
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.1.0
h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod
h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
+github.com/cespare/xxhash/v2 v2.2.0
h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
+github.com/cespare/xxhash/v2 v2.2.0/go.mod
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.0/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@@ -32,6 +36,7 @@ github.com/golang/protobuf
v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+
github.com/golang/protobuf v1.2.0/go.mod
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.3.0/go.mod
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod
h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
github.com/google/gofuzz v1.0.0/go.mod
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.1/go.mod
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@@ -42,14 +47,16 @@ github.com/json-iterator/go
v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBv
github.com/json-iterator/go v1.1.7/go.mod
h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/kisielk/errcheck v1.2.0/go.mod
h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod
h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
-github.com/konsorten/go-windows-terminal-sequences v1.0.1
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
-github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
-github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/konsorten/go-windows-terminal-sequences v1.0.3
h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
+github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0/go.mod
h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod
h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod
h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
+github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0
h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
+github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod
h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180320133207-05fbef0ca5da/go.mod
h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
@@ -65,12 +72,20 @@ github.com/onsi/gomega v1.5.0/go.mod
h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa
github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.0
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/sirupsen/logrus v1.4.2
h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
-github.com/sirupsen/logrus v1.4.2/go.mod
h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
+github.com/prometheus/client_golang v1.18.0
h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
+github.com/prometheus/client_golang v1.18.0/go.mod
h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA=
+github.com/prometheus/client_model v0.5.0
h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
+github.com/prometheus/client_model v0.5.0/go.mod
h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
+github.com/prometheus/common v0.45.0
h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM=
+github.com/prometheus/common v0.45.0/go.mod
h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY=
+github.com/prometheus/procfs v0.12.0
h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
+github.com/prometheus/procfs v0.12.0/go.mod
h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
+github.com/rogpeppe/go-internal v1.10.0
h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
+github.com/sirupsen/logrus v1.6.0
h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
+github.com/sirupsen/logrus v1.6.0/go.mod
h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod
h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.5/go.mod
h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
-github.com/stretchr/objx v0.1.1/go.mod
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v0.0.0-20151208002404-e3a8ff8ce365/go.mod
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
@@ -96,9 +111,11 @@ golang.org/x/text v0.3.2/go.mod
h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181011042414-1f849cf54d09/go.mod
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+google.golang.org/protobuf v1.32.0
h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
+google.golang.org/protobuf v1.32.0/go.mod
h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
-gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127
h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c
h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/fsnotify.v1 v1.4.7/go.mod
h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/natefinch/lumberjack.v2 v2.0.0
h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
@@ -108,8 +125,9 @@ gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
h1:yiW+nvdHb9LVqSHQBXfZCieqV
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637/go.mod
h1:BHsqpu/nsuzkT5BpiH1EMZPLyqSMM8JbIavyFACoFNk=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
+gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
k8s.io/apimachinery v0.16.13 h1:E40YK/NhqhUubG44ZHQULa4Pn+8NnXMAE6awvQ97Pyg=
k8s.io/apimachinery v0.16.13/go.mod
h1:4HMHS3mDHtVttspuuhrJ1GGr/0S9B6iWYWZ57KnnZqQ=
k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod
h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
diff --git a/go-client/integration/failover-test/main.go
b/go-client/integration/failover-test/main.go
index 481059283..356f39d56 100644
--- a/go-client/integration/failover-test/main.go
+++ b/go-client/integration/failover-test/main.go
@@ -25,6 +25,7 @@ import (
"strings"
"time"
+ "github.com/apache/incubator-pegasus/go-client/config"
"github.com/apache/incubator-pegasus/go-client/pegalog"
"github.com/apache/incubator-pegasus/go-client/pegasus"
)
@@ -35,7 +36,7 @@ import (
// Pegasus cluster is going well, whether the go-client gets work in expected
time.
func main() {
- client := pegasus.NewClient(pegasus.Config{MetaServers:
[]string{"172.21.0.11:35601"}})
+ client := pegasus.NewClient(config.Config{MetaServers:
[]string{"172.21.0.11:35601"}})
defer client.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
diff --git a/go-client/metrics/metrics.go b/go-client/metrics/metrics.go
new file mode 100644
index 000000000..2b19caf69
--- /dev/null
+++ b/go-client/metrics/metrics.go
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package metrics
+
+import (
+ "fmt"
+ "net/http"
+ "os"
+ "sync"
+ "time"
+
+ "github.com/apache/incubator-pegasus/go-client/config"
+ "github.com/apache/incubator-pegasus/go-client/pegalog"
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+)
+
+type Summary struct {
+ summary *prometheus.SummaryVec
+}
+
+var (
+ once sync.Once
+ constLabels map[string]string
+ PegasusClientOperationsSummary *Summary
+ PegasusClientRpcSizeSummary *Summary
+)
+
+func InitMetrics(cfg config.Config) {
+ once.Do(func() {
+ constLabels = cfg.PrometheusConstLabels
+ constLabels["endpoint"] = GetLocalHostName()
+
+ port := 9090
+ if cfg.PrometheusPort > 0 {
+ port = cfg.PrometheusPort
+ }
+ go func() {
+ http.Handle("/metrics", promhttp.Handler())
+ addr := fmt.Sprintf(":%d", port)
+ pegalog.GetLogger().Print("Starting Prometheus metrics
server on", addr)
+ if err := http.ListenAndServe(addr, nil); err != nil {
+ pegalog.GetLogger().Fatal("Failed to start
Prometheus metrics server:", err)
+ }
+ }()
+
+ PegasusClientOperationsSummary =
RegisterPromSummary("pegasus_client_operations", []string{"table", "operation",
"status", "meta_addresses"})
+ PegasusClientRpcSizeSummary =
RegisterPromSummary("pegasus_client_rpc_size", []string{"type"})
+ })
+}
+
+func RegisterPromSummary(summaryName string, extraLabels []string) *Summary {
+ summaryVec := prometheus.NewSummaryVec(
+ prometheus.SummaryOpts{
+ Name: summaryName,
+ ConstLabels: constLabels,
+ Objectives: map[float64]float64{0.99: 0.001, 0.999:
0.0001},
+ MaxAge: 5 * time.Minute,
+ AgeBuckets: 5,
+ },
+ extraLabels,
+ )
+ prometheus.MustRegister(summaryVec)
+
+ return &Summary{
+ summary: summaryVec,
+ }
+}
+
+func (s *Summary) Observe(extraLabelsVals []string, value float64) {
+ s.summary.WithLabelValues(extraLabelsVals...).Observe(value)
+}
+
+func GetLocalHostName() string {
+ hostname, err := os.Hostname()
+ if err != nil {
+ return ""
+ }
+ return hostname
+}
diff --git a/go-client/pegasus/client.go b/go-client/pegasus/client.go
index aa20f1073..e14b28268 100644
--- a/go-client/pegasus/client.go
+++ b/go-client/pegasus/client.go
@@ -23,6 +23,8 @@ import (
"context"
"sync"
+ "github.com/apache/incubator-pegasus/go-client/config"
+ "github.com/apache/incubator-pegasus/go-client/metrics"
"github.com/apache/incubator-pegasus/go-client/pegalog"
"github.com/apache/incubator-pegasus/go-client/session"
)
@@ -44,13 +46,14 @@ type pegasusClient struct {
// protect the access of tables
mu sync.RWMutex
- metaMgr *session.MetaManager
- replicaMgr *session.ReplicaManager
+ metaMgr *session.MetaManager
+ replicaMgr *session.ReplicaManager
+ enableMetrics bool
}
// NewClient creates a new instance of pegasus client.
// It panics if the configured addresses are illegal.
-func NewClient(cfg Config) Client {
+func NewClient(cfg config.Config) Client {
c, err := newClientWithError(cfg)
if err != nil {
pegalog.GetLogger().Fatal(err)
@@ -59,17 +62,22 @@ func NewClient(cfg Config) Client {
return c
}
-func newClientWithError(cfg Config) (Client, error) {
+func newClientWithError(cfg config.Config) (Client, error) {
var err error
cfg.MetaServers, err = session.ResolveMetaAddr(cfg.MetaServers)
if err != nil {
return nil, err
}
+ if cfg.EnablePrometheus {
+ metrics.InitMetrics(cfg)
+ }
+
c := &pegasusClient{
- tables: make(map[string]TableConnector),
- metaMgr: session.NewMetaManager(cfg.MetaServers,
session.NewNodeSession),
- replicaMgr: session.NewReplicaManager(session.NewNodeSession),
+ tables: make(map[string]TableConnector),
+ metaMgr: session.NewMetaManager(cfg.MetaServers,
session.NewNodeSession),
+ replicaMgr:
session.NewReplicaManagerWithMetrics(session.NewNodeSession,
cfg.EnablePrometheus),
+ enableMetrics: cfg.EnablePrometheus,
}
return c, nil
}
@@ -101,7 +109,7 @@ func (p *pegasusClient) OpenTable(ctx context.Context,
tableName string) (TableC
}
var tb TableConnector
- tb, err := ConnectTable(ctx, tableName, p.metaMgr, p.replicaMgr)
+ tb, err := ConnectTable(ctx, tableName, p.metaMgr,
p.replicaMgr, p.enableMetrics)
if err != nil {
return nil, err
}
diff --git a/go-client/pegasus/client_test.go b/go-client/pegasus/client_test.go
index 0832f6048..e20178e64 100644
--- a/go-client/pegasus/client_test.go
+++ b/go-client/pegasus/client_test.go
@@ -26,6 +26,7 @@ import (
"testing"
"time"
+ "github.com/apache/incubator-pegasus/go-client/config"
"github.com/fortytw2/leaktest"
"github.com/stretchr/testify/assert"
)
@@ -33,7 +34,7 @@ import (
func TestPegasusClient_OpenTable(t *testing.T) {
defer leaktest.Check(t)()
- cfg := Config{
+ cfg := config.Config{
MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602",
"0.0.0.0:34603"},
}
@@ -64,7 +65,7 @@ func TestPegasusClient_OpenTableTimeout(t *testing.T) {
defer leaktest.Check(t)()
// make sure the port 8801 is not opened on your computer.
- cfg := Config{
+ cfg := config.Config{
MetaServers: []string{"0.0.0.0:8801"},
}
@@ -85,7 +86,7 @@ func TestPegasusClient_OpenTableTimeout(t *testing.T) {
func TestPegasusClient_ConcurrentOpenSameTable(t *testing.T) {
defer leaktest.Check(t)()
- cfg := Config{
+ cfg := config.Config{
MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602",
"0.0.0.0:34603"},
}
client := NewClient(cfg)
@@ -120,7 +121,7 @@ func TestPegasusClient_ConcurrentOpenSameTable(t
*testing.T) {
func TestPegasusClient_ConcurrentMetaQueries(t *testing.T) {
defer leaktest.Check(t)()
- cfg := Config{
+ cfg := config.Config{
MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602",
"0.0.0.0:34603"},
}
client := NewClient(cfg)
@@ -143,19 +144,19 @@ func TestPegasusClient_ConcurrentMetaQueries(t
*testing.T) {
}
func TestPegasusClient_New(t *testing.T) {
- c, err := newClientWithError(Config{
+ c, err := newClientWithError(config.Config{
MetaServers: []string{"127.0.0.1:34601", "127.0.0.1:34602",
"127.0.0.1:34603"},
})
assert.Nil(t, err)
_ = c.Close()
- c, err = newClientWithError(Config{
+ c, err = newClientWithError(config.Config{
MetaServers: []string{"127abc"},
})
assert.NotNil(t, err)
assert.Nil(t, c)
- _, err = newClientWithError(Config{
+ _, err = newClientWithError(config.Config{
MetaServers: []string{},
})
assert.NotNil(t, err)
diff --git a/go-client/pegasus/config.go b/go-client/pegasus/config.go
deleted file mode 100644
index 6af879018..000000000
--- a/go-client/pegasus/config.go
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package pegasus
-
-// Config is the configuration of pegasus client.
-type Config struct {
- MetaServers []string `json:"meta_servers"`
-}
diff --git a/go-client/pegasus/table_connector.go
b/go-client/pegasus/table_connector.go
index ca429d31e..f9d4ad51a 100644
--- a/go-client/pegasus/table_connector.go
+++ b/go-client/pegasus/table_connector.go
@@ -26,12 +26,14 @@ import (
"errors"
"fmt"
"math"
+ "strings"
"sync"
"time"
"github.com/apache/incubator-pegasus/go-client/idl/base"
"github.com/apache/incubator-pegasus/go-client/idl/replication"
"github.com/apache/incubator-pegasus/go-client/idl/rrdb"
+ "github.com/apache/incubator-pegasus/go-client/metrics"
"github.com/apache/incubator-pegasus/go-client/pegalog"
"github.com/apache/incubator-pegasus/go-client/pegasus/op"
"github.com/apache/incubator-pegasus/go-client/session"
@@ -232,10 +234,11 @@ type pegasusTableConnector struct {
logger pegalog.Logger
- tableName string
- appID int32
- parts []*replicaNode
- mu sync.RWMutex
+ tableName string
+ appID int32
+ parts []*replicaNode
+ enableMetrics bool
+ mu sync.RWMutex
confUpdateCh chan bool
tom tomb.Tomb
@@ -248,13 +251,14 @@ type replicaNode struct {
// ConnectTable queries for the configuration of the given table, and set up
connection to
// the replicas which the table locates on.
-func ConnectTable(ctx context.Context, tableName string, meta
*session.MetaManager, replica *session.ReplicaManager) (TableConnector, error) {
+func ConnectTable(ctx context.Context, tableName string, meta
*session.MetaManager, replica *session.ReplicaManager, enableMetrics bool)
(TableConnector, error) {
p := &pegasusTableConnector{
- tableName: tableName,
- meta: meta,
- replica: replica,
- confUpdateCh: make(chan bool, 1),
- logger: pegalog.GetLogger(),
+ tableName: tableName,
+ meta: meta,
+ replica: replica,
+ enableMetrics: enableMetrics,
+ confUpdateCh: make(chan bool, 1),
+ logger: pegalog.GetLogger(),
}
// if the session became unresponsive, TableConnector auto-triggers
@@ -716,8 +720,32 @@ func (p *pegasusTableConnector) Incr(ctx context.Context,
hashKey []byte, sortKe
}
func (p *pegasusTableConnector) runPartitionOp(ctx context.Context, hashKey
[]byte, req op.Request, optype OpType) (interface{}, error) {
+ var errResult error
+ if p.enableMetrics {
+ start := time.Now()
+ defer func() {
+ status := "success"
+ if errResult != nil {
+ if errors.Is(ctx.Err(),
context.DeadlineExceeded) {
+ status = "timeout"
+ } else {
+ status = "fail"
+ }
+ }
+ labels := []string{
+ p.tableName,
+ optype.String(),
+ status,
+ strings.Join(p.meta.GetMetaIPAddrs(), ","),
+ }
+ elapsed := time.Since(start).Nanoseconds()
+ metrics.PegasusClientOperationsSummary.Observe(labels,
float64(elapsed))
+ }()
+ }
+
// validate arguments
if err := req.Validate(); err != nil {
+ errResult = err
return 0, WrapError(err, optype)
}
partitionHash := crc64Hash(hashKey)
@@ -727,6 +755,7 @@ func (p *pegasusTableConnector) runPartitionOp(ctx
context.Context, hashKey []by
confUpdated, retry, err = p.handleReplicaError(err, part)
return
})
+ errResult = err
return res, p.wrapPartitionError(err, gpid, part, optype)
}
diff --git a/go-client/pegasus/table_connector_test.go
b/go-client/pegasus/table_connector_test.go
index 4d7de11b9..75823c18c 100644
--- a/go-client/pegasus/table_connector_test.go
+++ b/go-client/pegasus/table_connector_test.go
@@ -24,12 +24,14 @@ import (
"context"
"errors"
"fmt"
+
"math"
"sort"
"sync"
"testing"
"time"
+ "github.com/apache/incubator-pegasus/go-client/config"
"github.com/apache/incubator-pegasus/go-client/idl/base"
"github.com/apache/incubator-pegasus/go-client/idl/replication"
"github.com/apache/incubator-pegasus/go-client/pegalog"
@@ -87,7 +89,7 @@ func testSingleKeyOperations(t *testing.T, tb TableConnector,
hashKey []byte, so
assert.Nil(t, tb.Del(context.Background(), hashKey, sortKey))
}
-var testingCfg = Config{
+var testingCfg = config.Config{
MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602",
"0.0.0.0:34603"},
}
diff --git a/go-client/session/meta_call.go b/go-client/session/meta_call.go
index 6b68a59cb..f86b9face 100644
--- a/go-client/session/meta_call.go
+++ b/go-client/session/meta_call.go
@@ -141,7 +141,7 @@ func (c *metaCall) issueSingleMeta(ctx context.Context,
curLeader int) bool {
c.lock.Lock()
c.metaIPAddrs = append(c.metaIPAddrs, addr)
c.metas = append(c.metas, &metaSession{
- NodeSession: newNodeSession(addr, NodeTypeMeta),
+ NodeSession: newNodeSession(addr, NodeTypeMeta,
DisableMetrics),
logger: pegalog.GetLogger(),
})
curLeader = len(c.metas) - 1
diff --git a/go-client/session/meta_session.go
b/go-client/session/meta_session.go
index 9ead2b85a..252839f1e 100644
--- a/go-client/session/meta_session.go
+++ b/go-client/session/meta_session.go
@@ -77,7 +77,7 @@ func NewMetaManager(addrs []string, creator
NodeSessionCreator) *MetaManager {
metaIPAddrs := make([]string, len(addrs))
for i, addr := range addrs {
metas[i] = &metaSession{
- NodeSession: creator(addr, NodeTypeMeta),
+ NodeSession: creator(addr, NodeTypeMeta,
DisableMetrics),
logger: pegalog.GetLogger(),
}
metaIPAddrs[i] = addr
@@ -120,6 +120,10 @@ func (m *MetaManager) QueryConfig(ctx context.Context,
tableName string) (*repli
return nil, err
}
+func (m *MetaManager) GetMetaIPAddrs() []string {
+ return m.metaIPAddrs
+}
+
func (m *MetaManager) getCurrentLeader() int {
m.mu.RLock()
defer m.mu.RUnlock()
diff --git a/go-client/session/replica_session.go
b/go-client/session/replica_session.go
index 09de6e6e3..8edd39fa5 100644
--- a/go-client/session/replica_session.go
+++ b/go-client/session/replica_session.go
@@ -188,6 +188,8 @@ type ReplicaManager struct {
creator NodeSessionCreator
unresponsiveHandler UnresponsiveHandler
+
+ enableMetrics bool
}
// UnresponsiveHandler is a callback executed when the session is in
unresponsive state.
@@ -205,7 +207,7 @@ func (rm *ReplicaManager) GetReplica(addr string)
*ReplicaSession {
if _, ok := rm.replicas[addr]; !ok {
r := &ReplicaSession{
- NodeSession: rm.creator(addr, NodeTypeReplica),
+ NodeSession: rm.creator(addr, NodeTypeReplica,
rm.enableMetrics),
}
withUnresponsiveHandler(r.NodeSession, rm.unresponsiveHandler)
rm.replicas[addr] = r
@@ -214,9 +216,14 @@ func (rm *ReplicaManager) GetReplica(addr string)
*ReplicaSession {
}
func NewReplicaManager(creator NodeSessionCreator) *ReplicaManager {
+ return NewReplicaManagerWithMetrics(creator, DisableMetrics)
+}
+
+func NewReplicaManagerWithMetrics(creator NodeSessionCreator, enableMetrics
bool) *ReplicaManager {
return &ReplicaManager{
- replicas: make(map[string]*ReplicaSession),
- creator: creator,
+ replicas: make(map[string]*ReplicaSession),
+ creator: creator,
+ enableMetrics: enableMetrics,
}
}
diff --git a/go-client/session/session.go b/go-client/session/session.go
index 20aa585ae..0f226171f 100644
--- a/go-client/session/session.go
+++ b/go-client/session/session.go
@@ -28,6 +28,7 @@ import (
"time"
"github.com/apache/incubator-pegasus/go-client/idl/base"
+ "github.com/apache/incubator-pegasus/go-client/metrics"
"github.com/apache/incubator-pegasus/go-client/pegalog"
"github.com/apache/incubator-pegasus/go-client/rpc"
"gopkg.in/tomb.v2"
@@ -47,6 +48,9 @@ const (
// LatencyTracingThreshold means RPC's latency higher than the
threshold (1000ms) will be traced
LatencyTracingThreshold = time.Millisecond * 1000
+
+ // DisableMetrics controls whether to collect RPC message size metrics.
+ DisableMetrics = false
)
// NodeSession represents the network session to a node
@@ -68,7 +72,7 @@ type NodeSession interface {
// NodeSessionCreator creates an instance of NodeSession,
// receiving argument `string` as host address, `NodeType`
// as the type of the node.
-type NodeSessionCreator func(string, NodeType) NodeSession
+type NodeSessionCreator func(string, NodeType, bool) NodeSession
// An implementation of NodeSession.
type nodeSession struct {
@@ -95,6 +99,8 @@ type nodeSession struct {
unresponsiveHandler UnresponsiveHandler
lastWriteTime int64
+
+ enableMetrics bool
}
// withUnresponsiveHandler enables the session to handle the event when a
network connection becomes unresponsive.
@@ -111,16 +117,17 @@ type requestListener struct {
call *PegasusRpcCall
}
-func newNodeSessionAddr(addr string, ntype NodeType) *nodeSession {
+func newNodeSessionAddr(addr string, ntype NodeType, enableMetrics bool)
*nodeSession {
return &nodeSession{
- logger: pegalog.GetLogger(),
- ntype: ntype,
- seqId: 0,
- codec: NewPegasusCodec(),
- pendingResp: make(map[int32]*requestListener),
- reqc: make(chan *requestListener),
- addr: addr,
- tom: &tomb.Tomb{},
+ logger: pegalog.GetLogger(),
+ ntype: ntype,
+ seqId: 0,
+ codec: NewPegasusCodec(),
+ pendingResp: make(map[int32]*requestListener),
+ reqc: make(chan *requestListener),
+ addr: addr,
+ tom: &tomb.Tomb{},
+ enableMetrics: enableMetrics,
//
redialc: make(chan bool, 1),
@@ -130,14 +137,14 @@ func newNodeSessionAddr(addr string, ntype NodeType)
*nodeSession {
// NewNodeSession always returns a non-nil value even when the
// connection attempt failed.
// Each nodeSession corresponds to an RpcConn.
-func NewNodeSession(addr string, ntype NodeType) NodeSession {
- return newNodeSession(addr, ntype)
+func NewNodeSession(addr string, ntype NodeType, enableMetrics bool)
NodeSession {
+ return newNodeSession(addr, ntype, enableMetrics)
}
-func newNodeSession(addr string, ntype NodeType) *nodeSession {
+func newNodeSession(addr string, ntype NodeType, enableMetrics bool)
*nodeSession {
logger := pegalog.GetLogger()
- n := newNodeSessionAddr(addr, ntype)
+ n := newNodeSessionAddr(addr, ntype, enableMetrics)
logger.Printf("create session with %s", n)
n.conn = rpc.NewRpcConn(addr)
@@ -400,6 +407,10 @@ func (n *nodeSession) CallWithGpid(ctx context.Context,
gpid *base.Gpid, partiti
}
func (n *nodeSession) writeRequest(r *PegasusRpcCall) error {
+ if n.enableMetrics {
+ metrics.PegasusClientRpcSizeSummary.Observe([]string{r.Name},
float64(len(r.RawReq)))
+ }
+
return n.conn.Write(r.RawReq)
}
diff --git a/go-client/session/session_test.go
b/go-client/session/session_test.go
index 8ec3e52cd..1860b186b 100644
--- a/go-client/session/session_test.go
+++ b/go-client/session/session_test.go
@@ -39,7 +39,7 @@ import (
)
func newFakeNodeSession(reader io.Reader, writer io.Writer) *nodeSession {
- n := newNodeSessionAddr("", NodeTypeMeta)
+ n := newNodeSessionAddr("", NodeTypeMeta, DisableMetrics)
n.conn = rpc.NewFakeRpcConn(reader, writer)
n.codec = &MockCodec{}
return n
@@ -47,7 +47,7 @@ func newFakeNodeSession(reader io.Reader, writer io.Writer)
*nodeSession {
func newMetaSession(addr string) *metaSession {
return &metaSession{
- NodeSession: newNodeSession(addr, NodeTypeMeta),
+ NodeSession: newNodeSession(addr, NodeTypeMeta, DisableMetrics),
logger: pegalog.GetLogger(),
}
}
@@ -93,7 +93,7 @@ func TestNodeSession_LoopForDialingSuccess(t *testing.T) {
defer leaktest.Check(t)()
addr := "www.baidu.com:80"
- n := newNodeSessionAddr(addr, "meta")
+ n := newNodeSessionAddr(addr, "meta", DisableMetrics)
n.conn = rpc.NewRpcConn(addr)
n.tom.Go(n.loopForDialing)
@@ -116,7 +116,7 @@ func TestNodeSession_LoopForDialingCancelled(t *testing.T) {
defer leaktest.Check(t)()
addr := "www.baidu.com:12321"
- n := newNodeSessionAddr(addr, "meta")
+ n := newNodeSessionAddr(addr, "meta", DisableMetrics)
n.conn = rpc.NewRpcConn(addr)
n.tom.Go(n.loopForDialing)
@@ -169,7 +169,7 @@ func TestNodeSession_WaitUntilSessionReady(t *testing.T) {
defer leaktest.Check(t)()
func() {
- n := newNodeSession("www.baidu.com:12321", "meta")
+ n := newNodeSession("www.baidu.com:12321", "meta",
DisableMetrics)
defer n.Close()
ctx, cancel := context.WithTimeout(context.Background(),
time.Millisecond*50)
@@ -181,7 +181,7 @@ func TestNodeSession_WaitUntilSessionReady(t *testing.T) {
}()
func() {
- n := newNodeSession("0.0.0.0:8800", "meta")
+ n := newNodeSession("0.0.0.0:8800", "meta", DisableMetrics)
defer n.Close()
err := n.waitUntilSessionReady(context.Background())
@@ -195,7 +195,7 @@ func TestNodeSession_CallToEcho(t *testing.T) {
defer leaktest.Check(t)()
// start echo server first
- n := newNodeSession("0.0.0.0:8800", NodeTypeMeta)
+ n := newNodeSession("0.0.0.0:8800", NodeTypeMeta, DisableMetrics)
defer n.Close()
var expected []byte
@@ -237,7 +237,7 @@ func TestNodeSession_ConcurrentCallToEcho(t *testing.T) {
defer leaktest.Check(t)()
// start echo server first
- n := newNodeSession("0.0.0.0:8800", NodeTypeMeta)
+ n := newNodeSession("0.0.0.0:8800", NodeTypeMeta, DisableMetrics)
mockCodec := &MockCodec{}
mockCodec.MockMarshal(func(v interface{}) ([]byte, error) {
@@ -320,7 +320,7 @@ func TestNodeSession_RestartConnection(t *testing.T) {
func TestNodeSession_ReceiveErrorCode(t *testing.T) {
defer leaktest.Check(t)()
- n := newNodeSession("0.0.0.0:8800", NodeTypeMeta)
+ n := newNodeSession("0.0.0.0:8800", NodeTypeMeta, DisableMetrics)
defer n.Close()
arg := rrdb.NewMetaQueryCfgArgs()
@@ -353,7 +353,7 @@ func TestNodeSession_Redial(t *testing.T) {
defer leaktest.Check(t)()
addr := "0.0.0.0:8800"
- n := newNodeSessionAddr(addr, "meta")
+ n := newNodeSessionAddr(addr, "meta", DisableMetrics)
n.conn = rpc.NewRpcConn(addr)
defer n.Close()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]