This is an automated email from the ASF dual-hosted git repository.

wangdan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git


The following commit(s) were added to refs/heads/master by this push:
     new 89b7c33b2 feat(go-client): introduce Prometheus for go-client 
monitoring and added multiple metric collection sites (#2285)
89b7c33b2 is described below

commit 89b7c33b2d387b3cd28af05a9b916869b529d939
Author: nanorth <[email protected]>
AuthorDate: Mon Nov 10 15:23:36 2025 +0800

    feat(go-client): introduce Prometheus for go-client monitoring and added 
multiple metric collection sites (#2285)
    
    ### What problem does this PR solve? <!--add issue link with summary if 
exists-->
    This PR introduces Prometheus for go-client monitoring, and adds multiple 
metric collection points to enhance observability of the system.
    
    ### What is changed and how does it work?
    This PR integrates the Prometheus client libraries 
(github.com/prometheus/client_golang and github.com/prometheus/client_model) as 
tools for metric instrumentation and data reporting.
    
    ### Checklist <!--REMOVE the items that are not applicable-->
    
    ##### Tests <!-- At least one of them must be included. -->
    
    - Manual test (add detailed scripts or steps below)
    Use go-client to invoke several interfaces, and the metrics were verified 
to be correctly reported on Prometheus monitoring services.
    <img width="1664" height="562" alt="image" 
src="https://github.com/user-attachments/assets/5976457b-e8f4-4225-b53f-1d07b0f358da";
 />
    
    
    ##### Code changes
    
    - Has exported function/method change
    Added metric collection for operation counts and latencies in `func (p 
*pegasusTableConnector) runPartitionOp`.
    Added RPC request size metric collection in `func (n *nodeSession) 
writeRequest`.
    - Has exported variable/fields change
    Added a new field `enablePerfCounter` in `nodeSession` to distinguish 
between meta and replica nodes. Performance metrics are collected only for 
replica nodes. The constructor has been updated accordingly.
    - Package structure change
    The `Config` struct has been moved into a separate package to avoid 
circular dependencies.
    
    ##### Related changes
    - Need to update the documentation
    Since the `Config` struct has been moved to a separate package, the example 
code has also been updated to reflect this change.
---
 go-client/admin/client.go                    |   3 +-
 go-client/admin/client_test.go               |   3 +-
 go-client/admin/remote_cmd_client.go         |   2 +-
 go-client/config/config.go                   |  62 +++++++++++++
 go-client/example/full_scan.md               | 131 ++++++++++++++-------------
 go-client/example/main.go                    |   3 +-
 go-client/example/pegasus-client-config.json |   8 +-
 go-client/go.mod                             |  14 ++-
 go-client/go.sum                             |  34 +++++--
 go-client/integration/failover-test/main.go  |   3 +-
 go-client/metrics/metrics.go                 |  97 ++++++++++++++++++++
 go-client/pegasus/client.go                  |  24 +++--
 go-client/pegasus/client_test.go             |  15 +--
 go-client/pegasus/config.go                  |  25 -----
 go-client/pegasus/table_connector.go         |  49 ++++++++--
 go-client/pegasus/table_connector_test.go    |   4 +-
 go-client/session/meta_call.go               |   2 +-
 go-client/session/meta_session.go            |   6 +-
 go-client/session/replica_session.go         |  13 ++-
 go-client/session/session.go                 |  39 +++++---
 go-client/session/session_test.go            |  20 ++--
 21 files changed, 395 insertions(+), 162 deletions(-)

diff --git a/go-client/admin/client.go b/go-client/admin/client.go
index d51a0b52a..81e607d5a 100644
--- a/go-client/admin/client.go
+++ b/go-client/admin/client.go
@@ -172,7 +172,8 @@ func (c *rpcBasedClient) CreateTable(tableName string, 
partitionCount int32, rep
                        AppType:        "pegasus",
                        IsStateful:     true,
                        Envs:           envs,
-               }}
+               },
+       }
 
        var appID int32
        var respErr error
diff --git a/go-client/admin/client_test.go b/go-client/admin/client_test.go
index a263c7c5a..d768c025a 100644
--- a/go-client/admin/client_test.go
+++ b/go-client/admin/client_test.go
@@ -25,6 +25,7 @@ import (
        "testing"
        "time"
 
+       "github.com/apache/incubator-pegasus/go-client/config"
        "github.com/apache/incubator-pegasus/go-client/idl/admin"
        "github.com/apache/incubator-pegasus/go-client/idl/replication"
        "github.com/apache/incubator-pegasus/go-client/pegasus"
@@ -110,7 +111,7 @@ func TestAdmin_CreateTableMustAvailable(t *testing.T) {
        }
 
        // ensures the created table must be available for read and write
-       rwClient := pegasus.NewClient(pegasus.Config{
+       rwClient := pegasus.NewClient(config.Config{
                MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", 
"0.0.0.0:34603"},
        })
        defer func() {
diff --git a/go-client/admin/remote_cmd_client.go 
b/go-client/admin/remote_cmd_client.go
index 8acf64467..5bedf7faa 100644
--- a/go-client/admin/remote_cmd_client.go
+++ b/go-client/admin/remote_cmd_client.go
@@ -33,7 +33,7 @@ type RemoteCmdClient struct {
 // NewRemoteCmdClient returns an instance of RemoteCmdClient.
 func NewRemoteCmdClient(addr string, nodeType session.NodeType) 
*RemoteCmdClient {
        return &RemoteCmdClient{
-               session: session.NewNodeSession(addr, nodeType),
+               session: session.NewNodeSession(addr, nodeType, 
session.DisableMetrics),
        }
 }
 
diff --git a/go-client/config/config.go b/go-client/config/config.go
new file mode 100644
index 000000000..7e82226cd
--- /dev/null
+++ b/go-client/config/config.go
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package config
+
+// Config is the configuration of pegasus client.
+type Config struct {
+       MetaServers           []string          `json:"meta_servers"`
+       PrometheusConstLabels map[string]string `json:"prometheus_const_labels"`
+       EnablePrometheus      bool              `json:"enable_prometheus"`
+       PrometheusPort        int               `json:"prometheus_port"`
+}
+
+type Option func(*Config)
+
+func NewConfig(metaServers []string, opts ...Option) *Config {
+       cfg := &Config{
+               MetaServers:           metaServers,
+               PrometheusConstLabels: make(map[string]string),
+               EnablePrometheus:      false,
+               PrometheusPort:        9090,
+       }
+
+       for _, opt := range opts {
+               opt(cfg)
+       }
+       return cfg
+}
+
+func WithPerfCounterTags(tags map[string]string) Option {
+       return func(c *Config) {
+               c.PrometheusConstLabels = tags
+       }
+}
+
+func WithEnablePrometheus(enable bool) Option {
+       return func(c *Config) {
+               c.EnablePrometheus = enable
+       }
+}
+
+func WithPrometheusPort(port int) Option {
+       return func(c *Config) {
+               c.PrometheusPort = port
+       }
+}
diff --git a/go-client/example/full_scan.md b/go-client/example/full_scan.md
index 524d53615..20dd617d7 100644
--- a/go-client/example/full_scan.md
+++ b/go-client/example/full_scan.md
@@ -34,79 +34,80 @@ lower than []bytes(oneYearAgoTs).
 package main
 
 import (
-    "context"
-    "encoding/binary"
-    "time"
+       "context"
+       "encoding/binary"
+       "time"
 
-    "github.com/apache/incubator-pegasus/go-client/pegalog"
-    "github.com/apache/incubator-pegasus/go-client/pegasus"
+       "github.com/apache/incubator-pegasus/go-client/config"
+       "github.com/apache/incubator-pegasus/go-client/pegalog"
+       "github.com/apache/incubator-pegasus/go-client/pegasus"
 )
 
 func searchHistoryOneYearAgo() {
-    // Customize where the pegasus-go-client's logs reside.
-    pegalog.SetLogger(pegalog.NewLogrusLogger(&pegalog.LogrusConfig{
-        Filename: "./pegasus.log",
-    }))
-    logger := pegalog.GetLogger()
+       // Customize where the pegasus-go-client's logs reside.
+       pegalog.SetLogger(pegalog.NewLogrusLogger(&pegalog.LogrusConfig{
+               Filename: "./pegasus.log",
+       }))
+       logger := pegalog.GetLogger()
 
-    // Configure the meta addresses to access the pegasus cluster.
-    cfg := &pegasus.Config{
-        MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34601"},
-    }
-    c := pegasus.NewClient(*cfg)
+       // Configure the meta addresses to access the pegasus cluster.
+       cfg := &config.Config{
+               MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34601"},
+       }
+       c := pegasus.NewClient(*cfg)
 
-    // Establish the connections to replica-servers.
-    ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
-    tb, err := c.OpenTable(ctx, "user_history")
-    if err != nil {
-        logger.Print(err)
-        return
-    }
-    logger.Print("opened table user_history")
+       // Establish the connections to replica-servers.
+       ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
+       tb, err := c.OpenTable(ctx, "user_history")
+       if err != nil {
+               logger.Print(err)
+               return
+       }
+       logger.Print("opened table user_history")
 
-    // Set up the scanners.
-    ctx, _ = context.WithTimeout(context.Background(), time.Second*10)
-    sopts := &pegasus.ScannerOptions{
-        BatchSize: 20,
-        // Values can be optimized out during scanning to reduce the workload.
-        NoValue: true,
-    }
-    scanners, err := tb.GetUnorderedScanners(ctx, 16, sopts)
-    if err != nil {
-        logger.Print(err)
-    }
-    logger.Printf("opened %d scanners", len(scanners))
-    oneYearAgo := int(time.Now().AddDate(-1, 0, 0).UnixNano() / 1000 / 1000)
-    for i, scanner := range scanners {
-        // Iterates sequentially.
+       // Set up the scanners.
+       ctx, _ = context.WithTimeout(context.Background(), time.Second*10)
+       sopts := &pegasus.ScannerOptions{
+               BatchSize: 20,
+               // Values can be optimized out during scanning to reduce the 
workload.
+               NoValue: true,
+       }
+       scanners, err := tb.GetUnorderedScanners(ctx, 16, sopts)
+       if err != nil {
+               logger.Print(err)
+       }
+       logger.Printf("opened %d scanners", len(scanners))
+       oneYearAgo := int(time.Now().AddDate(-1, 0, 0).UnixNano() / 1000 / 1000)
+       for i, scanner := range scanners {
+               // Iterates sequentially.
 
-        start := time.Now()
-        cnt := 0
-        for true {
-            ctx, _ = context.WithTimeout(context.Background(), time.Second*10)
-            completed, hashKey, sortKey, _, err := scanner.Next(ctx)
-            if err != nil {
-                logger.Print(err)
-                return
-            }
-            if completed {
-                logger.Printf("scanner %d completes", i)
-                break
-            }
-            if len(sortKey) == 8 {
-                res := int(binary.BigEndian.Uint64(sortKey))
-                if res < oneYearAgo {
-                    logger.Printf("hashkey=%s, sortkey=%d\n", string(hashKey), 
res)
-                }
-            }
+               start := time.Now()
+               cnt := 0
+               for true {
+                       ctx, _ = context.WithTimeout(context.Background(), 
time.Second*10)
+                       completed, hashKey, sortKey, _, err := scanner.Next(ctx)
+                       if err != nil {
+                               logger.Print(err)
+                               return
+                       }
+                       if completed {
+                               logger.Printf("scanner %d completes", i)
+                               break
+                       }
+                       if len(sortKey) == 8 {
+                               res := int(binary.BigEndian.Uint64(sortKey))
+                               if res < oneYearAgo {
+                                       logger.Printf("hashkey=%s, 
sortkey=%d\n", string(hashKey), res)
+                               }
+                       }
 
-            cnt++
-            if time.Now().Sub(start) > time.Minute {
-                logger.Printf("scan 1-min, %d rows in total", cnt)
-                start = time.Now()
-            }
-        }
-    }
-    logger.Print("program exits")
+                       cnt++
+                       if time.Now().Sub(start) > time.Minute {
+                               logger.Printf("scan 1-min, %d rows in total", 
cnt)
+                               start = time.Now()
+                       }
+               }
+       }
+       logger.Print("program exits")
 }
 ```
diff --git a/go-client/example/main.go b/go-client/example/main.go
index cab1c462e..f81f2d2f8 100644
--- a/go-client/example/main.go
+++ b/go-client/example/main.go
@@ -27,6 +27,7 @@ import (
        "path/filepath"
        "time"
 
+       "github.com/apache/incubator-pegasus/go-client/config"
        "github.com/apache/incubator-pegasus/go-client/pegalog"
        "github.com/apache/incubator-pegasus/go-client/pegasus"
 )
@@ -48,7 +49,7 @@ func main() {
        }))
        logger := pegalog.GetLogger()
 
-       cfg := &pegasus.Config{}
+       cfg := &config.Config{}
        json.Unmarshal(rawCfg, cfg)
        c := pegasus.NewClient(*cfg)
 
diff --git a/go-client/example/pegasus-client-config.json 
b/go-client/example/pegasus-client-config.json
index 1cf688269..2db1526a7 100644
--- a/go-client/example/pegasus-client-config.json
+++ b/go-client/example/pegasus-client-config.json
@@ -3,5 +3,11 @@
     "127.0.0.1:34601",
     "127.0.0.1:34602",
     "127.0.0.1:34603"
-  ]
+  ],
+  "prometheus_const_labels": {
+    "k1": "v1",
+    "k2": "v2"
+  },
+  "enable_prometheus": false,
+  "prometheus_port": 9090
 }
\ No newline at end of file
diff --git a/go-client/go.mod b/go-client/go.mod
index 08a1cea1d..911d10045 100644
--- a/go-client/go.mod
+++ b/go-client/go.mod
@@ -24,7 +24,8 @@ require (
        github.com/apache/thrift v0.13.0
        github.com/cenkalti/backoff/v4 v4.1.0
        github.com/fortytw2/leaktest v1.3.0
-       github.com/sirupsen/logrus v1.4.2
+       github.com/prometheus/client_golang v1.18.0
+       github.com/sirupsen/logrus v1.6.0
        github.com/stretchr/testify v1.4.0
        gopkg.in/natefinch/lumberjack.v2 v2.0.0
        gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
@@ -33,10 +34,17 @@ require (
 
 require (
        github.com/BurntSushi/toml v0.3.1 // indirect
+       github.com/beorn7/perks v1.0.1 // indirect
+       github.com/cespare/xxhash/v2 v2.2.0 // indirect
        github.com/davecgh/go-spew v1.1.1 // indirect
-       github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect
+       github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
+       github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
        github.com/pmezard/go-difflib v1.0.0 // indirect
+       github.com/prometheus/client_model v0.5.0 // indirect
+       github.com/prometheus/common v0.45.0 // indirect
+       github.com/prometheus/procfs v0.12.0 // indirect
        golang.org/x/net v0.38.0 // indirect
        golang.org/x/sys v0.31.0 // indirect
-       gopkg.in/yaml.v2 v2.2.8 // indirect
+       google.golang.org/protobuf v1.32.0 // indirect
+       gopkg.in/yaml.v2 v2.4.0 // indirect
 )
diff --git a/go-client/go.sum b/go-client/go.sum
index f4a3321a9..94db4da94 100644
--- a/go-client/go.sum
+++ b/go-client/go.sum
@@ -7,8 +7,12 @@ github.com/agiledragon/gomonkey v2.0.2+incompatible 
h1:eXKi9/piiC3cjJD1658mEE2o3
 github.com/agiledragon/gomonkey v2.0.2+incompatible/go.mod 
h1:2NGfXu1a80LLr2cmWXGBDaHEjb1idR6+FVlX5T3D9hw=
 github.com/apache/thrift v0.13.0 
h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI=
 github.com/apache/thrift v0.13.0/go.mod 
h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
+github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
+github.com/beorn7/perks v1.0.1/go.mod 
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
 github.com/cenkalti/backoff/v4 v4.1.0 
h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
 github.com/cenkalti/backoff/v4 v4.1.0/go.mod 
h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
+github.com/cespare/xxhash/v2 v2.2.0 
h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
+github.com/cespare/xxhash/v2 v2.2.0/go.mod 
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@@ -32,6 +36,7 @@ github.com/golang/protobuf 
v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+
 github.com/golang/protobuf v1.2.0/go.mod 
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/golang/protobuf v1.3.1/go.mod 
h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/google/go-cmp v0.3.0/go.mod 
h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
 github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod 
h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI=
 github.com/google/gofuzz v1.0.0/go.mod 
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/google/uuid v1.1.1/go.mod 
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@@ -42,14 +47,16 @@ github.com/json-iterator/go 
v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBv
 github.com/json-iterator/go v1.1.7/go.mod 
h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
 github.com/kisielk/errcheck v1.2.0/go.mod 
h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
 github.com/kisielk/gotool v1.0.0/go.mod 
h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
-github.com/konsorten/go-windows-terminal-sequences v1.0.1 
h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
-github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod 
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
-github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/konsorten/go-windows-terminal-sequences v1.0.3 
h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
+github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod 
h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/kr/pretty v0.1.0/go.mod 
h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
 github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
 github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
 github.com/kr/text v0.1.0/go.mod 
h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
 github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod 
h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
+github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 
h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
+github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod 
h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
 github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod 
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod 
h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/reflect2 v0.0.0-20180320133207-05fbef0ca5da/go.mod 
h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
@@ -65,12 +72,20 @@ github.com/onsi/gomega v1.5.0/go.mod 
h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa
 github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
-github.com/sirupsen/logrus v1.4.2 
h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
-github.com/sirupsen/logrus v1.4.2/go.mod 
h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
+github.com/prometheus/client_golang v1.18.0 
h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
+github.com/prometheus/client_golang v1.18.0/go.mod 
h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA=
+github.com/prometheus/client_model v0.5.0 
h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
+github.com/prometheus/client_model v0.5.0/go.mod 
h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
+github.com/prometheus/common v0.45.0 
h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM=
+github.com/prometheus/common v0.45.0/go.mod 
h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY=
+github.com/prometheus/procfs v0.12.0 
h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
+github.com/prometheus/procfs v0.12.0/go.mod 
h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
+github.com/rogpeppe/go-internal v1.10.0 
h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
+github.com/sirupsen/logrus v1.6.0 
h1:UBcNElsrwanuuMsnGSlYmtmgbb23qDR5dG+6X6Oo89I=
+github.com/sirupsen/logrus v1.6.0/go.mod 
h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
 github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod 
h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
 github.com/spf13/pflag v1.0.5/go.mod 
h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
 github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
-github.com/stretchr/objx v0.1.1/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v0.0.0-20151208002404-e3a8ff8ce365/go.mod 
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
 github.com/stretchr/testify v1.2.2/go.mod 
h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
 github.com/stretchr/testify v1.3.0/go.mod 
h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
@@ -96,9 +111,11 @@ golang.org/x/text v0.3.2/go.mod 
h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20181011042414-1f849cf54d09/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+google.golang.org/protobuf v1.32.0 
h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I=
+google.golang.org/protobuf v1.32.0/go.mod 
h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
-gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 
h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c 
h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
 gopkg.in/fsnotify.v1 v1.4.7/go.mod 
h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
 gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0 
h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8=
@@ -108,8 +125,9 @@ gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 
h1:yiW+nvdHb9LVqSHQBXfZCieqV
 gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637/go.mod 
h1:BHsqpu/nsuzkT5BpiH1EMZPLyqSMM8JbIavyFACoFNk=
 gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
 gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
+gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
 k8s.io/apimachinery v0.16.13 h1:E40YK/NhqhUubG44ZHQULa4Pn+8NnXMAE6awvQ97Pyg=
 k8s.io/apimachinery v0.16.13/go.mod 
h1:4HMHS3mDHtVttspuuhrJ1GGr/0S9B6iWYWZ57KnnZqQ=
 k8s.io/gengo v0.0.0-20190128074634-0689ccc1d7d6/go.mod 
h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
diff --git a/go-client/integration/failover-test/main.go 
b/go-client/integration/failover-test/main.go
index 481059283..356f39d56 100644
--- a/go-client/integration/failover-test/main.go
+++ b/go-client/integration/failover-test/main.go
@@ -25,6 +25,7 @@ import (
        "strings"
        "time"
 
+       "github.com/apache/incubator-pegasus/go-client/config"
        "github.com/apache/incubator-pegasus/go-client/pegalog"
        "github.com/apache/incubator-pegasus/go-client/pegasus"
 )
@@ -35,7 +36,7 @@ import (
 // Pegasus cluster is going well, whether the go-client gets work in expected 
time.
 
 func main() {
-       client := pegasus.NewClient(pegasus.Config{MetaServers: 
[]string{"172.21.0.11:35601"}})
+       client := pegasus.NewClient(config.Config{MetaServers: 
[]string{"172.21.0.11:35601"}})
        defer client.Close()
 
        ctx, cancel := context.WithTimeout(context.Background(), time.Second)
diff --git a/go-client/metrics/metrics.go b/go-client/metrics/metrics.go
new file mode 100644
index 000000000..2b19caf69
--- /dev/null
+++ b/go-client/metrics/metrics.go
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package metrics
+
+import (
+       "fmt"
+       "net/http"
+       "os"
+       "sync"
+       "time"
+
+       "github.com/apache/incubator-pegasus/go-client/config"
+       "github.com/apache/incubator-pegasus/go-client/pegalog"
+       "github.com/prometheus/client_golang/prometheus"
+       "github.com/prometheus/client_golang/prometheus/promhttp"
+)
+
+type Summary struct {
+       summary *prometheus.SummaryVec
+}
+
+var (
+       once                           sync.Once
+       constLabels                    map[string]string
+       PegasusClientOperationsSummary *Summary
+       PegasusClientRpcSizeSummary    *Summary
+)
+
+func InitMetrics(cfg config.Config) {
+       once.Do(func() {
+               constLabels = cfg.PrometheusConstLabels
+               constLabels["endpoint"] = GetLocalHostName()
+
+               port := 9090
+               if cfg.PrometheusPort > 0 {
+                       port = cfg.PrometheusPort
+               }
+               go func() {
+                       http.Handle("/metrics", promhttp.Handler())
+                       addr := fmt.Sprintf(":%d", port)
+                       pegalog.GetLogger().Print("Starting Prometheus metrics 
server on", addr)
+                       if err := http.ListenAndServe(addr, nil); err != nil {
+                               pegalog.GetLogger().Fatal("Failed to start 
Prometheus metrics server:", err)
+                       }
+               }()
+
+               PegasusClientOperationsSummary = 
RegisterPromSummary("pegasus_client_operations", []string{"table", "operation", 
"status", "meta_addresses"})
+               PegasusClientRpcSizeSummary = 
RegisterPromSummary("pegasus_client_rpc_size", []string{"type"})
+       })
+}
+
+func RegisterPromSummary(summaryName string, extraLabels []string) *Summary {
+       summaryVec := prometheus.NewSummaryVec(
+               prometheus.SummaryOpts{
+                       Name:        summaryName,
+                       ConstLabels: constLabels,
+                       Objectives:  map[float64]float64{0.99: 0.001, 0.999: 
0.0001},
+                       MaxAge:      5 * time.Minute,
+                       AgeBuckets:  5,
+               },
+               extraLabels,
+       )
+       prometheus.MustRegister(summaryVec)
+
+       return &Summary{
+               summary: summaryVec,
+       }
+}
+
+func (s *Summary) Observe(extraLabelsVals []string, value float64) {
+       s.summary.WithLabelValues(extraLabelsVals...).Observe(value)
+}
+
+func GetLocalHostName() string {
+       hostname, err := os.Hostname()
+       if err != nil {
+               return ""
+       }
+       return hostname
+}
diff --git a/go-client/pegasus/client.go b/go-client/pegasus/client.go
index aa20f1073..e14b28268 100644
--- a/go-client/pegasus/client.go
+++ b/go-client/pegasus/client.go
@@ -23,6 +23,8 @@ import (
        "context"
        "sync"
 
+       "github.com/apache/incubator-pegasus/go-client/config"
+       "github.com/apache/incubator-pegasus/go-client/metrics"
        "github.com/apache/incubator-pegasus/go-client/pegalog"
        "github.com/apache/incubator-pegasus/go-client/session"
 )
@@ -44,13 +46,14 @@ type pegasusClient struct {
        // protect the access of tables
        mu sync.RWMutex
 
-       metaMgr    *session.MetaManager
-       replicaMgr *session.ReplicaManager
+       metaMgr       *session.MetaManager
+       replicaMgr    *session.ReplicaManager
+       enableMetrics bool
 }
 
 // NewClient creates a new instance of pegasus client.
 // It panics if the configured addresses are illegal.
-func NewClient(cfg Config) Client {
+func NewClient(cfg config.Config) Client {
        c, err := newClientWithError(cfg)
        if err != nil {
                pegalog.GetLogger().Fatal(err)
@@ -59,17 +62,22 @@ func NewClient(cfg Config) Client {
        return c
 }
 
-func newClientWithError(cfg Config) (Client, error) {
+func newClientWithError(cfg config.Config) (Client, error) {
        var err error
        cfg.MetaServers, err = session.ResolveMetaAddr(cfg.MetaServers)
        if err != nil {
                return nil, err
        }
 
+       if cfg.EnablePrometheus {
+               metrics.InitMetrics(cfg)
+       }
+
        c := &pegasusClient{
-               tables:     make(map[string]TableConnector),
-               metaMgr:    session.NewMetaManager(cfg.MetaServers, 
session.NewNodeSession),
-               replicaMgr: session.NewReplicaManager(session.NewNodeSession),
+               tables:        make(map[string]TableConnector),
+               metaMgr:       session.NewMetaManager(cfg.MetaServers, 
session.NewNodeSession),
+               replicaMgr:    
session.NewReplicaManagerWithMetrics(session.NewNodeSession, 
cfg.EnablePrometheus),
+               enableMetrics: cfg.EnablePrometheus,
        }
        return c, nil
 }
@@ -101,7 +109,7 @@ func (p *pegasusClient) OpenTable(ctx context.Context, 
tableName string) (TableC
                }
 
                var tb TableConnector
-               tb, err := ConnectTable(ctx, tableName, p.metaMgr, p.replicaMgr)
+               tb, err := ConnectTable(ctx, tableName, p.metaMgr, 
p.replicaMgr, p.enableMetrics)
                if err != nil {
                        return nil, err
                }
diff --git a/go-client/pegasus/client_test.go b/go-client/pegasus/client_test.go
index 0832f6048..e20178e64 100644
--- a/go-client/pegasus/client_test.go
+++ b/go-client/pegasus/client_test.go
@@ -26,6 +26,7 @@ import (
        "testing"
        "time"
 
+       "github.com/apache/incubator-pegasus/go-client/config"
        "github.com/fortytw2/leaktest"
        "github.com/stretchr/testify/assert"
 )
@@ -33,7 +34,7 @@ import (
 func TestPegasusClient_OpenTable(t *testing.T) {
        defer leaktest.Check(t)()
 
-       cfg := Config{
+       cfg := config.Config{
                MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", 
"0.0.0.0:34603"},
        }
 
@@ -64,7 +65,7 @@ func TestPegasusClient_OpenTableTimeout(t *testing.T) {
        defer leaktest.Check(t)()
 
        // make sure the port 8801 is not opened on your computer.
-       cfg := Config{
+       cfg := config.Config{
                MetaServers: []string{"0.0.0.0:8801"},
        }
 
@@ -85,7 +86,7 @@ func TestPegasusClient_OpenTableTimeout(t *testing.T) {
 func TestPegasusClient_ConcurrentOpenSameTable(t *testing.T) {
        defer leaktest.Check(t)()
 
-       cfg := Config{
+       cfg := config.Config{
                MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", 
"0.0.0.0:34603"},
        }
        client := NewClient(cfg)
@@ -120,7 +121,7 @@ func TestPegasusClient_ConcurrentOpenSameTable(t 
*testing.T) {
 func TestPegasusClient_ConcurrentMetaQueries(t *testing.T) {
        defer leaktest.Check(t)()
 
-       cfg := Config{
+       cfg := config.Config{
                MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", 
"0.0.0.0:34603"},
        }
        client := NewClient(cfg)
@@ -143,19 +144,19 @@ func TestPegasusClient_ConcurrentMetaQueries(t 
*testing.T) {
 }
 
 func TestPegasusClient_New(t *testing.T) {
-       c, err := newClientWithError(Config{
+       c, err := newClientWithError(config.Config{
                MetaServers: []string{"127.0.0.1:34601", "127.0.0.1:34602", 
"127.0.0.1:34603"},
        })
        assert.Nil(t, err)
        _ = c.Close()
 
-       c, err = newClientWithError(Config{
+       c, err = newClientWithError(config.Config{
                MetaServers: []string{"127abc"},
        })
        assert.NotNil(t, err)
        assert.Nil(t, c)
 
-       _, err = newClientWithError(Config{
+       _, err = newClientWithError(config.Config{
                MetaServers: []string{},
        })
        assert.NotNil(t, err)
diff --git a/go-client/pegasus/config.go b/go-client/pegasus/config.go
deleted file mode 100644
index 6af879018..000000000
--- a/go-client/pegasus/config.go
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package pegasus
-
-// Config is the configuration of pegasus client.
-type Config struct {
-       MetaServers []string `json:"meta_servers"`
-}
diff --git a/go-client/pegasus/table_connector.go 
b/go-client/pegasus/table_connector.go
index ca429d31e..f9d4ad51a 100644
--- a/go-client/pegasus/table_connector.go
+++ b/go-client/pegasus/table_connector.go
@@ -26,12 +26,14 @@ import (
        "errors"
        "fmt"
        "math"
+       "strings"
        "sync"
        "time"
 
        "github.com/apache/incubator-pegasus/go-client/idl/base"
        "github.com/apache/incubator-pegasus/go-client/idl/replication"
        "github.com/apache/incubator-pegasus/go-client/idl/rrdb"
+       "github.com/apache/incubator-pegasus/go-client/metrics"
        "github.com/apache/incubator-pegasus/go-client/pegalog"
        "github.com/apache/incubator-pegasus/go-client/pegasus/op"
        "github.com/apache/incubator-pegasus/go-client/session"
@@ -232,10 +234,11 @@ type pegasusTableConnector struct {
 
        logger pegalog.Logger
 
-       tableName string
-       appID     int32
-       parts     []*replicaNode
-       mu        sync.RWMutex
+       tableName     string
+       appID         int32
+       parts         []*replicaNode
+       enableMetrics bool
+       mu            sync.RWMutex
 
        confUpdateCh chan bool
        tom          tomb.Tomb
@@ -248,13 +251,14 @@ type replicaNode struct {
 
 // ConnectTable queries for the configuration of the given table, and set up 
connection to
 // the replicas which the table locates on.
-func ConnectTable(ctx context.Context, tableName string, meta 
*session.MetaManager, replica *session.ReplicaManager) (TableConnector, error) {
+func ConnectTable(ctx context.Context, tableName string, meta 
*session.MetaManager, replica *session.ReplicaManager, enableMetrics bool) 
(TableConnector, error) {
        p := &pegasusTableConnector{
-               tableName:    tableName,
-               meta:         meta,
-               replica:      replica,
-               confUpdateCh: make(chan bool, 1),
-               logger:       pegalog.GetLogger(),
+               tableName:     tableName,
+               meta:          meta,
+               replica:       replica,
+               enableMetrics: enableMetrics,
+               confUpdateCh:  make(chan bool, 1),
+               logger:        pegalog.GetLogger(),
        }
 
        // if the session became unresponsive, TableConnector auto-triggers
@@ -716,8 +720,32 @@ func (p *pegasusTableConnector) Incr(ctx context.Context, 
hashKey []byte, sortKe
 }
 
 func (p *pegasusTableConnector) runPartitionOp(ctx context.Context, hashKey 
[]byte, req op.Request, optype OpType) (interface{}, error) {
+       var errResult error
+       if p.enableMetrics {
+               start := time.Now()
+               defer func() {
+                       status := "success"
+                       if errResult != nil {
+                               if errors.Is(ctx.Err(), 
context.DeadlineExceeded) {
+                                       status = "timeout"
+                               } else {
+                                       status = "fail"
+                               }
+                       }
+                       labels := []string{
+                               p.tableName,
+                               optype.String(),
+                               status,
+                               strings.Join(p.meta.GetMetaIPAddrs(), ","),
+                       }
+                       elapsed := time.Since(start).Nanoseconds()
+                       metrics.PegasusClientOperationsSummary.Observe(labels, 
float64(elapsed))
+               }()
+       }
+
        // validate arguments
        if err := req.Validate(); err != nil {
+               errResult = err
                return 0, WrapError(err, optype)
        }
        partitionHash := crc64Hash(hashKey)
@@ -727,6 +755,7 @@ func (p *pegasusTableConnector) runPartitionOp(ctx 
context.Context, hashKey []by
                confUpdated, retry, err = p.handleReplicaError(err, part)
                return
        })
+       errResult = err
        return res, p.wrapPartitionError(err, gpid, part, optype)
 }
 
diff --git a/go-client/pegasus/table_connector_test.go 
b/go-client/pegasus/table_connector_test.go
index 4d7de11b9..75823c18c 100644
--- a/go-client/pegasus/table_connector_test.go
+++ b/go-client/pegasus/table_connector_test.go
@@ -24,12 +24,14 @@ import (
        "context"
        "errors"
        "fmt"
+
        "math"
        "sort"
        "sync"
        "testing"
        "time"
 
+       "github.com/apache/incubator-pegasus/go-client/config"
        "github.com/apache/incubator-pegasus/go-client/idl/base"
        "github.com/apache/incubator-pegasus/go-client/idl/replication"
        "github.com/apache/incubator-pegasus/go-client/pegalog"
@@ -87,7 +89,7 @@ func testSingleKeyOperations(t *testing.T, tb TableConnector, 
hashKey []byte, so
        assert.Nil(t, tb.Del(context.Background(), hashKey, sortKey))
 }
 
-var testingCfg = Config{
+var testingCfg = config.Config{
        MetaServers: []string{"0.0.0.0:34601", "0.0.0.0:34602", 
"0.0.0.0:34603"},
 }
 
diff --git a/go-client/session/meta_call.go b/go-client/session/meta_call.go
index 6b68a59cb..f86b9face 100644
--- a/go-client/session/meta_call.go
+++ b/go-client/session/meta_call.go
@@ -141,7 +141,7 @@ func (c *metaCall) issueSingleMeta(ctx context.Context, 
curLeader int) bool {
                        c.lock.Lock()
                        c.metaIPAddrs = append(c.metaIPAddrs, addr)
                        c.metas = append(c.metas, &metaSession{
-                               NodeSession: newNodeSession(addr, NodeTypeMeta),
+                               NodeSession: newNodeSession(addr, NodeTypeMeta, 
DisableMetrics),
                                logger:      pegalog.GetLogger(),
                        })
                        curLeader = len(c.metas) - 1
diff --git a/go-client/session/meta_session.go 
b/go-client/session/meta_session.go
index 9ead2b85a..252839f1e 100644
--- a/go-client/session/meta_session.go
+++ b/go-client/session/meta_session.go
@@ -77,7 +77,7 @@ func NewMetaManager(addrs []string, creator 
NodeSessionCreator) *MetaManager {
        metaIPAddrs := make([]string, len(addrs))
        for i, addr := range addrs {
                metas[i] = &metaSession{
-                       NodeSession: creator(addr, NodeTypeMeta),
+                       NodeSession: creator(addr, NodeTypeMeta, 
DisableMetrics),
                        logger:      pegalog.GetLogger(),
                }
                metaIPAddrs[i] = addr
@@ -120,6 +120,10 @@ func (m *MetaManager) QueryConfig(ctx context.Context, 
tableName string) (*repli
        return nil, err
 }
 
+func (m *MetaManager) GetMetaIPAddrs() []string {
+       return m.metaIPAddrs
+}
+
 func (m *MetaManager) getCurrentLeader() int {
        m.mu.RLock()
        defer m.mu.RUnlock()
diff --git a/go-client/session/replica_session.go 
b/go-client/session/replica_session.go
index 09de6e6e3..8edd39fa5 100644
--- a/go-client/session/replica_session.go
+++ b/go-client/session/replica_session.go
@@ -188,6 +188,8 @@ type ReplicaManager struct {
        creator NodeSessionCreator
 
        unresponsiveHandler UnresponsiveHandler
+
+       enableMetrics bool
 }
 
 // UnresponsiveHandler is a callback executed when the session is in 
unresponsive state.
@@ -205,7 +207,7 @@ func (rm *ReplicaManager) GetReplica(addr string) 
*ReplicaSession {
 
        if _, ok := rm.replicas[addr]; !ok {
                r := &ReplicaSession{
-                       NodeSession: rm.creator(addr, NodeTypeReplica),
+                       NodeSession: rm.creator(addr, NodeTypeReplica, 
rm.enableMetrics),
                }
                withUnresponsiveHandler(r.NodeSession, rm.unresponsiveHandler)
                rm.replicas[addr] = r
@@ -214,9 +216,14 @@ func (rm *ReplicaManager) GetReplica(addr string) 
*ReplicaSession {
 }
 
 func NewReplicaManager(creator NodeSessionCreator) *ReplicaManager {
+       return NewReplicaManagerWithMetrics(creator, DisableMetrics)
+}
+
+func NewReplicaManagerWithMetrics(creator NodeSessionCreator, enableMetrics 
bool) *ReplicaManager {
        return &ReplicaManager{
-               replicas: make(map[string]*ReplicaSession),
-               creator:  creator,
+               replicas:      make(map[string]*ReplicaSession),
+               creator:       creator,
+               enableMetrics: enableMetrics,
        }
 }
 
diff --git a/go-client/session/session.go b/go-client/session/session.go
index 20aa585ae..0f226171f 100644
--- a/go-client/session/session.go
+++ b/go-client/session/session.go
@@ -28,6 +28,7 @@ import (
        "time"
 
        "github.com/apache/incubator-pegasus/go-client/idl/base"
+       "github.com/apache/incubator-pegasus/go-client/metrics"
        "github.com/apache/incubator-pegasus/go-client/pegalog"
        "github.com/apache/incubator-pegasus/go-client/rpc"
        "gopkg.in/tomb.v2"
@@ -47,6 +48,9 @@ const (
 
        // LatencyTracingThreshold means RPC's latency higher than the 
threshold (1000ms) will be traced
        LatencyTracingThreshold = time.Millisecond * 1000
+
+       // DisableMetrics controls whether to collect RPC message size metrics.
+       DisableMetrics = false
 )
 
 // NodeSession represents the network session to a node
@@ -68,7 +72,7 @@ type NodeSession interface {
 // NodeSessionCreator creates an instance of NodeSession,
 // receiving argument `string` as host address, `NodeType`
 // as the type of the node.
-type NodeSessionCreator func(string, NodeType) NodeSession
+type NodeSessionCreator func(string, NodeType, bool) NodeSession
 
 // An implementation of NodeSession.
 type nodeSession struct {
@@ -95,6 +99,8 @@ type nodeSession struct {
 
        unresponsiveHandler UnresponsiveHandler
        lastWriteTime       int64
+
+       enableMetrics bool
 }
 
 // withUnresponsiveHandler enables the session to handle the event when a 
network connection becomes unresponsive.
@@ -111,16 +117,17 @@ type requestListener struct {
        call *PegasusRpcCall
 }
 
-func newNodeSessionAddr(addr string, ntype NodeType) *nodeSession {
+func newNodeSessionAddr(addr string, ntype NodeType, enableMetrics bool) 
*nodeSession {
        return &nodeSession{
-               logger:      pegalog.GetLogger(),
-               ntype:       ntype,
-               seqId:       0,
-               codec:       NewPegasusCodec(),
-               pendingResp: make(map[int32]*requestListener),
-               reqc:        make(chan *requestListener),
-               addr:        addr,
-               tom:         &tomb.Tomb{},
+               logger:        pegalog.GetLogger(),
+               ntype:         ntype,
+               seqId:         0,
+               codec:         NewPegasusCodec(),
+               pendingResp:   make(map[int32]*requestListener),
+               reqc:          make(chan *requestListener),
+               addr:          addr,
+               tom:           &tomb.Tomb{},
+               enableMetrics: enableMetrics,
 
                //
                redialc: make(chan bool, 1),
@@ -130,14 +137,14 @@ func newNodeSessionAddr(addr string, ntype NodeType) 
*nodeSession {
 // NewNodeSession always returns a non-nil value even when the
 // connection attempt failed.
 // Each nodeSession corresponds to an RpcConn.
-func NewNodeSession(addr string, ntype NodeType) NodeSession {
-       return newNodeSession(addr, ntype)
+func NewNodeSession(addr string, ntype NodeType, enableMetrics bool) 
NodeSession {
+       return newNodeSession(addr, ntype, enableMetrics)
 }
 
-func newNodeSession(addr string, ntype NodeType) *nodeSession {
+func newNodeSession(addr string, ntype NodeType, enableMetrics bool) 
*nodeSession {
        logger := pegalog.GetLogger()
 
-       n := newNodeSessionAddr(addr, ntype)
+       n := newNodeSessionAddr(addr, ntype, enableMetrics)
        logger.Printf("create session with %s", n)
 
        n.conn = rpc.NewRpcConn(addr)
@@ -400,6 +407,10 @@ func (n *nodeSession) CallWithGpid(ctx context.Context, 
gpid *base.Gpid, partiti
 }
 
 func (n *nodeSession) writeRequest(r *PegasusRpcCall) error {
+       if n.enableMetrics {
+               metrics.PegasusClientRpcSizeSummary.Observe([]string{r.Name}, 
float64(len(r.RawReq)))
+       }
+
        return n.conn.Write(r.RawReq)
 }
 
diff --git a/go-client/session/session_test.go 
b/go-client/session/session_test.go
index 8ec3e52cd..1860b186b 100644
--- a/go-client/session/session_test.go
+++ b/go-client/session/session_test.go
@@ -39,7 +39,7 @@ import (
 )
 
 func newFakeNodeSession(reader io.Reader, writer io.Writer) *nodeSession {
-       n := newNodeSessionAddr("", NodeTypeMeta)
+       n := newNodeSessionAddr("", NodeTypeMeta, DisableMetrics)
        n.conn = rpc.NewFakeRpcConn(reader, writer)
        n.codec = &MockCodec{}
        return n
@@ -47,7 +47,7 @@ func newFakeNodeSession(reader io.Reader, writer io.Writer) 
*nodeSession {
 
 func newMetaSession(addr string) *metaSession {
        return &metaSession{
-               NodeSession: newNodeSession(addr, NodeTypeMeta),
+               NodeSession: newNodeSession(addr, NodeTypeMeta, DisableMetrics),
                logger:      pegalog.GetLogger(),
        }
 }
@@ -93,7 +93,7 @@ func TestNodeSession_LoopForDialingSuccess(t *testing.T) {
        defer leaktest.Check(t)()
 
        addr := "www.baidu.com:80"
-       n := newNodeSessionAddr(addr, "meta")
+       n := newNodeSessionAddr(addr, "meta", DisableMetrics)
        n.conn = rpc.NewRpcConn(addr)
 
        n.tom.Go(n.loopForDialing)
@@ -116,7 +116,7 @@ func TestNodeSession_LoopForDialingCancelled(t *testing.T) {
        defer leaktest.Check(t)()
 
        addr := "www.baidu.com:12321"
-       n := newNodeSessionAddr(addr, "meta")
+       n := newNodeSessionAddr(addr, "meta", DisableMetrics)
        n.conn = rpc.NewRpcConn(addr)
 
        n.tom.Go(n.loopForDialing)
@@ -169,7 +169,7 @@ func TestNodeSession_WaitUntilSessionReady(t *testing.T) {
        defer leaktest.Check(t)()
 
        func() {
-               n := newNodeSession("www.baidu.com:12321", "meta")
+               n := newNodeSession("www.baidu.com:12321", "meta", 
DisableMetrics)
                defer n.Close()
 
                ctx, cancel := context.WithTimeout(context.Background(), 
time.Millisecond*50)
@@ -181,7 +181,7 @@ func TestNodeSession_WaitUntilSessionReady(t *testing.T) {
        }()
 
        func() {
-               n := newNodeSession("0.0.0.0:8800", "meta")
+               n := newNodeSession("0.0.0.0:8800", "meta", DisableMetrics)
                defer n.Close()
 
                err := n.waitUntilSessionReady(context.Background())
@@ -195,7 +195,7 @@ func TestNodeSession_CallToEcho(t *testing.T) {
        defer leaktest.Check(t)()
 
        // start echo server first
-       n := newNodeSession("0.0.0.0:8800", NodeTypeMeta)
+       n := newNodeSession("0.0.0.0:8800", NodeTypeMeta, DisableMetrics)
        defer n.Close()
 
        var expected []byte
@@ -237,7 +237,7 @@ func TestNodeSession_ConcurrentCallToEcho(t *testing.T) {
        defer leaktest.Check(t)()
 
        // start echo server first
-       n := newNodeSession("0.0.0.0:8800", NodeTypeMeta)
+       n := newNodeSession("0.0.0.0:8800", NodeTypeMeta, DisableMetrics)
 
        mockCodec := &MockCodec{}
        mockCodec.MockMarshal(func(v interface{}) ([]byte, error) {
@@ -320,7 +320,7 @@ func TestNodeSession_RestartConnection(t *testing.T) {
 func TestNodeSession_ReceiveErrorCode(t *testing.T) {
        defer leaktest.Check(t)()
 
-       n := newNodeSession("0.0.0.0:8800", NodeTypeMeta)
+       n := newNodeSession("0.0.0.0:8800", NodeTypeMeta, DisableMetrics)
        defer n.Close()
 
        arg := rrdb.NewMetaQueryCfgArgs()
@@ -353,7 +353,7 @@ func TestNodeSession_Redial(t *testing.T) {
        defer leaktest.Check(t)()
 
        addr := "0.0.0.0:8800"
-       n := newNodeSessionAddr(addr, "meta")
+       n := newNodeSessionAddr(addr, "meta", DisableMetrics)
        n.conn = rpc.NewRpcConn(addr)
        defer n.Close()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to