This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f6754a0  [fix #472] move GetPartitionedTopicMetadata to lookup service 
(#478)
f6754a0 is described below

commit f6754a0dfad4e906fb439183380027f1745d6a9e
Author: Rui Fu <[email protected]>
AuthorDate: Fri Mar 5 17:37:35 2021 +0800

    [fix #472] move GetPartitionedTopicMetadata to lookup service (#478)
    
    Fixes #472
    
    ### Motivation
    
    #472 addressed a socket leakage in go client. with some debug works, a 
difference between java client and go client was found. one difference is go 
client send `PartitionedTopicMetadata` request with new connection, but java is 
using lookup service's connection. This pr added `GetPartitionedTopicMetadata` 
to lookup service to reuse the lookup service connection.
    
    Through this pr, the leakage has been resolved with [this sample 
deployment](https://gist.github.com/freeznet/8681525ddd240877af88633363ba222f), 
the sockets are keep to 7 for 2 hours test. But still need to check with 
@KannarFr  to see if there are any other possible leakages since I cannot have 
a cluster with 20 proxies/20 brokers/7 ZK locally.
---
 go.sum                                 |   6 --
 pulsar/client_impl.go                  |   9 +--
 pulsar/internal/lookup_service.go      |  26 +++++++
 pulsar/internal/lookup_service_test.go | 120 ++++++++++++++++++++++++++++-----
 pulsar/internal/metrics.go             |  20 ++++--
 5 files changed, 145 insertions(+), 36 deletions(-)

diff --git a/go.sum b/go.sum
index 6b4db62..7f61837 100644
--- a/go.sum
+++ b/go.sum
@@ -161,7 +161,6 @@ go.uber.org/atomic v1.7.0 
h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
 go.uber.org/atomic v1.7.0/go.mod 
h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod 
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod 
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
-golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 
h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
 golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod 
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod 
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 
h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
@@ -177,7 +176,6 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod 
h1:z5CRVTTTmAJ677TzLL
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
-golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 
h1:AeiKBIuRw3UomYXSbLy0Mc2dDLfdtbT/IVn4keq83P0=
 golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod 
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
 golang.org/x/net v0.0.0-20201021035429-f5854403a974 
h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
 golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod 
h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
@@ -203,12 +201,10 @@ golang.org/x/sys 
v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7w
 golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 
h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
 golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f 
h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
 golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
-golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
@@ -219,7 +215,6 @@ golang.org/x/tools 
v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY
 golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod 
h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 
h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 
h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -246,7 +241,6 @@ gopkg.in/yaml.v2 v2.2.1/go.mod 
h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
 gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
 gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index b7950eb..7acbaff 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -163,17 +163,10 @@ func (c *client) TopicPartitions(topic string) ([]string, 
error) {
                return nil, err
        }
 
-       id := c.rpcClient.NewRequestID()
-       res, err := c.rpcClient.RequestToAnyBroker(id, 
pb.BaseCommand_PARTITIONED_METADATA,
-               &pb.CommandPartitionedTopicMetadata{
-                       RequestId: &id,
-                       Topic:     &topicName.Name,
-               })
+       r, err := c.lookupService.GetPartitionedTopicMetadata(topic)
        if err != nil {
                return nil, err
        }
-
-       r := res.Response.PartitionMetadataResponse
        if r != nil {
                if r.Error != nil {
                        return nil, newError(ResultLookupError, 
r.GetError().String())
diff --git a/pulsar/internal/lookup_service.go 
b/pulsar/internal/lookup_service.go
index 400063d..65fa383 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -38,6 +38,10 @@ type LookupService interface {
        // Lookup perform a lookup for the given topic, confirm the location of 
the broker
        // where the topic is located, and return the LookupResult.
        Lookup(topic string) (*LookupResult, error)
+
+       // GetPartitionedTopicMetadata perform a 
CommandPartitionedTopicMetadata request for
+       // the given topic, returns the CommandPartitionedTopicMetadataResponse 
as the result.
+       GetPartitionedTopicMetadata(topic string) 
(*pb.CommandPartitionedTopicMetadataResponse, error)
 }
 
 type lookupService struct {
@@ -150,3 +154,25 @@ func (ls *lookupService) Lookup(topic string) 
(*LookupResult, error) {
 
        return nil, errors.New("exceeded max number of redirection during topic 
lookup")
 }
+
+func (ls *lookupService) GetPartitionedTopicMetadata(topic string) 
(*pb.CommandPartitionedTopicMetadataResponse,
+       error) {
+       ls.metrics.PartitionedTopicMetadataRequestsCount.Inc()
+       topicName, err := ParseTopicName(topic)
+       if err != nil {
+               return nil, err
+       }
+
+       id := ls.rpcClient.NewRequestID()
+       res, err := ls.rpcClient.RequestToAnyBroker(id, 
pb.BaseCommand_PARTITIONED_METADATA,
+               &pb.CommandPartitionedTopicMetadata{
+                       RequestId: &id,
+                       Topic:     &topicName.Name,
+               })
+       if err != nil {
+               return nil, err
+       }
+       ls.log.Debugf("Got topic{%s} partitioned metadata response: %+v", 
topic, res)
+
+       return res.Response.PartitionMetadataResponse, nil
+}
diff --git a/pulsar/internal/lookup_service_test.go 
b/pulsar/internal/lookup_service_test.go
index 89e4475..ad0e1bd 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -28,7 +28,7 @@ import (
        "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
-type mockedRPCClient struct {
+type mockedLookupRPCClient struct {
        requestIDGenerator uint64
        t                  *testing.T
 
@@ -38,20 +38,20 @@ type mockedRPCClient struct {
 }
 
 // Create a new unique request id
-func (c *mockedRPCClient) NewRequestID() uint64 {
+func (c *mockedLookupRPCClient) NewRequestID() uint64 {
        c.requestIDGenerator++
        return c.requestIDGenerator
 }
 
-func (c *mockedRPCClient) NewProducerID() uint64 {
+func (c *mockedLookupRPCClient) NewProducerID() uint64 {
        return 1
 }
 
-func (c *mockedRPCClient) NewConsumerID() uint64 {
+func (c *mockedLookupRPCClient) NewConsumerID() uint64 {
        return 1
 }
 
-func (c *mockedRPCClient) RequestToAnyBroker(requestID uint64, cmdType 
pb.BaseCommand_Type,
+func (c *mockedLookupRPCClient) RequestToAnyBroker(requestID uint64, cmdType 
pb.BaseCommand_Type,
        message proto.Message) (*RPCResult, error) {
        assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP)
 
@@ -71,7 +71,7 @@ func (c *mockedRPCClient) RequestToAnyBroker(requestID 
uint64, cmdType pb.BaseCo
        }, nil
 }
 
-func (c *mockedRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.URL, 
requestID uint64,
+func (c *mockedLookupRPCClient) Request(logicalAddr *url.URL, physicalAddr 
*url.URL, requestID uint64,
        cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) 
{
        assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP)
        expectedRequest := &c.expectedRequests[0]
@@ -93,13 +93,14 @@ func (c *mockedRPCClient) Request(logicalAddr *url.URL, 
physicalAddr *url.URL, r
        }, nil
 }
 
-func (c *mockedRPCClient) RequestOnCnx(cnx Connection, requestID uint64, 
cmdType pb.BaseCommand_Type,
+func (c *mockedLookupRPCClient) RequestOnCnx(cnx Connection, requestID uint64, 
cmdType pb.BaseCommand_Type,
        message proto.Message) (*RPCResult, error) {
        assert.Fail(c.t, "Shouldn't be called")
        return nil, nil
 }
 
-func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType 
pb.BaseCommand_Type, message proto.Message) error {
+func (c *mockedLookupRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType 
pb.BaseCommand_Type,
+       message proto.Message) error {
        assert.Fail(c.t, "Shouldn't be called")
        return nil
 }
@@ -112,7 +113,7 @@ func TestLookupSuccess(t *testing.T) {
        url, err := url.Parse("pulsar://example:6650")
        assert.NoError(t, err)
 
-       ls := NewLookupService(&mockedRPCClient{
+       ls := NewLookupService(&mockedLookupRPCClient{
                t: t,
 
                expectedRequests: []pb.CommandLookupTopic{
@@ -144,7 +145,7 @@ func TestTlsLookupSuccess(t *testing.T) {
        url, err := url.Parse("pulsar+ssl://example:6651")
        assert.NoError(t, err)
 
-       ls := NewLookupService(&mockedRPCClient{
+       ls := NewLookupService(&mockedLookupRPCClient{
                t: t,
 
                expectedRequests: []pb.CommandLookupTopic{
@@ -176,7 +177,7 @@ func TestLookupWithProxy(t *testing.T) {
        url, err := url.Parse("pulsar://example:6650")
        assert.NoError(t, err)
 
-       ls := NewLookupService(&mockedRPCClient{
+       ls := NewLookupService(&mockedLookupRPCClient{
                t: t,
 
                expectedRequests: []pb.CommandLookupTopic{
@@ -209,7 +210,7 @@ func TestTlsLookupWithProxy(t *testing.T) {
        url, err := url.Parse("pulsar+ssl://example:6651")
        assert.NoError(t, err)
 
-       ls := NewLookupService(&mockedRPCClient{
+       ls := NewLookupService(&mockedLookupRPCClient{
                t: t,
 
                expectedRequests: []pb.CommandLookupTopic{
@@ -242,7 +243,7 @@ func TestLookupWithRedirect(t *testing.T) {
        url, err := url.Parse("pulsar://example:6650")
        assert.NoError(t, err)
 
-       ls := NewLookupService(&mockedRPCClient{
+       ls := NewLookupService(&mockedLookupRPCClient{
                t:           t,
                expectedURL: "pulsar://broker-2:6650",
 
@@ -286,7 +287,7 @@ func TestTlsLookupWithRedirect(t *testing.T) {
        url, err := url.Parse("pulsar+ssl://example:6651")
        assert.NoError(t, err)
 
-       ls := NewLookupService(&mockedRPCClient{
+       ls := NewLookupService(&mockedLookupRPCClient{
                t:           t,
                expectedURL: "pulsar+ssl://broker-2:6651",
 
@@ -330,7 +331,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
        url, err := url.Parse("pulsar://example:6650")
        assert.NoError(t, err)
 
-       ls := NewLookupService(&mockedRPCClient{
+       ls := NewLookupService(&mockedLookupRPCClient{
                t: t,
 
                expectedRequests: []pb.CommandLookupTopic{
@@ -360,7 +361,7 @@ func TestLookupWithLookupFailure(t *testing.T) {
        url, err := url.Parse("pulsar://example:6650")
        assert.NoError(t, err)
 
-       ls := NewLookupService(&mockedRPCClient{
+       ls := NewLookupService(&mockedLookupRPCClient{
                t: t,
 
                expectedRequests: []pb.CommandLookupTopic{
@@ -383,3 +384,90 @@ func TestLookupWithLookupFailure(t *testing.T) {
        assert.Error(t, err)
        assert.Nil(t, lr)
 }
+
+type mockedPartitionedTopicMetadataRPCClient struct {
+       requestIDGenerator uint64
+       t                  *testing.T
+
+       expectedRequests []pb.CommandPartitionedTopicMetadata
+       mockedResponses  []pb.CommandPartitionedTopicMetadataResponse
+}
+
+func (m mockedPartitionedTopicMetadataRPCClient) NewRequestID() uint64 {
+       m.requestIDGenerator++
+       return m.requestIDGenerator
+}
+
+func (m mockedPartitionedTopicMetadataRPCClient) NewProducerID() uint64 {
+       return 1
+}
+
+func (m mockedPartitionedTopicMetadataRPCClient) NewConsumerID() uint64 {
+       return 1
+}
+
+func (m mockedPartitionedTopicMetadataRPCClient) RequestToAnyBroker(requestID 
uint64, cmdType pb.BaseCommand_Type,
+       message proto.Message) (*RPCResult, error) {
+       assert.Equal(m.t, cmdType, pb.BaseCommand_PARTITIONED_METADATA)
+
+       expectedRequest := &m.expectedRequests[0]
+       m.expectedRequests = m.expectedRequests[1:]
+
+       assert.Equal(m.t, *expectedRequest.RequestId, requestID)
+
+       mockedResponse := &m.mockedResponses[0]
+       m.mockedResponses = m.mockedResponses[1:]
+
+       return &RPCResult{
+               &pb.BaseCommand{
+                       PartitionMetadataResponse: mockedResponse,
+               },
+               nil,
+       }, nil
+}
+
+func (m mockedPartitionedTopicMetadataRPCClient) Request(logicalAddr *url.URL, 
physicalAddr *url.URL, requestID uint64,
+       cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) 
{
+       assert.Fail(m.t, "Shouldn't be called")
+       return nil, nil
+}
+
+func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnxNoWait(cnx 
Connection, cmdType pb.BaseCommand_Type,
+       message proto.Message) error {
+       assert.Fail(m.t, "Shouldn't be called")
+       return nil
+}
+
+func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnx(cnx Connection, 
requestID uint64,
+       cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) 
{
+       assert.Fail(m.t, "Shouldn't be called")
+       return nil, nil
+}
+
+func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
+       url, err := url.Parse("pulsar://example:6650")
+       assert.NoError(t, err)
+
+       ls := NewLookupService(&mockedPartitionedTopicMetadataRPCClient{
+               t: t,
+
+               expectedRequests: []pb.CommandPartitionedTopicMetadata{
+                       {
+                               RequestId: proto.Uint64(1),
+                               Topic:     proto.String("my-topic"),
+                       },
+               },
+               mockedResponses: []pb.CommandPartitionedTopicMetadataResponse{
+                       {
+                               RequestId:  proto.Uint64(1),
+                               Partitions: proto.Uint32(1),
+                               Response:   
pb.CommandPartitionedTopicMetadataResponse_Success.Enum(),
+                       },
+               },
+       }, url, false, log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
+
+       metadata, err := ls.GetPartitionedTopicMetadata("my-topic")
+       assert.NoError(t, err)
+       assert.NotNil(t, metadata)
+       assert.Equal(t, metadata.GetPartitions(), uint32(1))
+}
diff --git a/pulsar/internal/metrics.go b/pulsar/internal/metrics.go
index 875587c..d44522d 100644
--- a/pulsar/internal/metrics.go
+++ b/pulsar/internal/metrics.go
@@ -51,12 +51,13 @@ type Metrics struct {
        readersClosed       *prometheus.CounterVec
 
        // Metrics that are not labeled with topic, are immediately available
-       ConnectionsOpened              prometheus.Counter
-       ConnectionsClosed              prometheus.Counter
-       ConnectionsEstablishmentErrors prometheus.Counter
-       ConnectionsHandshakeErrors     prometheus.Counter
-       LookupRequestsCount            prometheus.Counter
-       RPCRequestCount                prometheus.Counter
+       ConnectionsOpened                     prometheus.Counter
+       ConnectionsClosed                     prometheus.Counter
+       ConnectionsEstablishmentErrors        prometheus.Counter
+       ConnectionsHandshakeErrors            prometheus.Counter
+       LookupRequestsCount                   prometheus.Counter
+       PartitionedTopicMetadataRequestsCount prometheus.Counter
+       RPCRequestCount                       prometheus.Counter
 }
 
 type TopicMetrics struct {
@@ -268,6 +269,12 @@ func NewMetricsProvider(userDefinedLabels 
map[string]string) *Metrics {
                        ConstLabels: constLabels,
                }),
 
+               PartitionedTopicMetadataRequestsCount: 
prometheus.NewCounter(prometheus.CounterOpts{
+                       Name:        
"pulsar_client_partitioned_topic_metadata_count",
+                       Help:        "Counter of partitioned_topic_metadata 
requests made by the client",
+                       ConstLabels: constLabels,
+               }),
+
                RPCRequestCount: prometheus.NewCounter(prometheus.CounterOpts{
                        Name:        "pulsar_client_rpc_count",
                        Help:        "Counter of RPC requests made by the 
client",
@@ -306,6 +313,7 @@ func NewMetricsProvider(userDefinedLabels 
map[string]string) *Metrics {
        
prometheus.DefaultRegisterer.Register(metrics.ConnectionsEstablishmentErrors)
        
prometheus.DefaultRegisterer.Register(metrics.ConnectionsHandshakeErrors)
        prometheus.DefaultRegisterer.Register(metrics.LookupRequestsCount)
+       
prometheus.DefaultRegisterer.Register(metrics.PartitionedTopicMetadataRequestsCount)
        prometheus.DefaultRegisterer.Register(metrics.RPCRequestCount)
        return metrics
 }

Reply via email to