This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new f6754a0 [fix #472] move GetPartitionedTopicMetadata to lookup service
(#478)
f6754a0 is described below
commit f6754a0dfad4e906fb439183380027f1745d6a9e
Author: Rui Fu <[email protected]>
AuthorDate: Fri Mar 5 17:37:35 2021 +0800
[fix #472] move GetPartitionedTopicMetadata to lookup service (#478)
Fixes #472
### Motivation
#472 addressed a socket leakage in go client. with some debug works, a
difference between java client and go client was found. one difference is go
client send `PartitionedTopicMetadata` request with new connection, but java is
using lookup service's connection. This pr added `GetPartitionedTopicMetadata`
to lookup service to reuse the lookup service connection.
Through this pr, the leakage has been resolved with [this sample
deployment](https://gist.github.com/freeznet/8681525ddd240877af88633363ba222f),
the sockets are keep to 7 for 2 hours test. But still need to check with
@KannarFr to see if there are any other possible leakages since I cannot have
a cluster with 20 proxies/20 brokers/7 ZK locally.
---
go.sum | 6 --
pulsar/client_impl.go | 9 +--
pulsar/internal/lookup_service.go | 26 +++++++
pulsar/internal/lookup_service_test.go | 120 ++++++++++++++++++++++++++++-----
pulsar/internal/metrics.go | 20 ++++--
5 files changed, 145 insertions(+), 36 deletions(-)
diff --git a/go.sum b/go.sum
index 6b4db62..7f61837 100644
--- a/go.sum
+++ b/go.sum
@@ -161,7 +161,6 @@ go.uber.org/atomic v1.7.0
h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod
h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod
h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
-golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4
h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
@@ -177,7 +176,6 @@ golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod
h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
-golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7
h1:AeiKBIuRw3UomYXSbLy0Mc2dDLfdtbT/IVn4keq83P0=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201021035429-f5854403a974
h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod
h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
@@ -203,12 +201,10 @@ golang.org/x/sys
v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1
h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f
h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
-golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
@@ -219,7 +215,6 @@ golang.org/x/tools
v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod
h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
-golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543
h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -246,7 +241,6 @@ gopkg.in/yaml.v2 v2.2.1/go.mod
h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index b7950eb..7acbaff 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -163,17 +163,10 @@ func (c *client) TopicPartitions(topic string) ([]string,
error) {
return nil, err
}
- id := c.rpcClient.NewRequestID()
- res, err := c.rpcClient.RequestToAnyBroker(id,
pb.BaseCommand_PARTITIONED_METADATA,
- &pb.CommandPartitionedTopicMetadata{
- RequestId: &id,
- Topic: &topicName.Name,
- })
+ r, err := c.lookupService.GetPartitionedTopicMetadata(topic)
if err != nil {
return nil, err
}
-
- r := res.Response.PartitionMetadataResponse
if r != nil {
if r.Error != nil {
return nil, newError(ResultLookupError,
r.GetError().String())
diff --git a/pulsar/internal/lookup_service.go
b/pulsar/internal/lookup_service.go
index 400063d..65fa383 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -38,6 +38,10 @@ type LookupService interface {
// Lookup perform a lookup for the given topic, confirm the location of
the broker
// where the topic is located, and return the LookupResult.
Lookup(topic string) (*LookupResult, error)
+
+ // GetPartitionedTopicMetadata perform a
CommandPartitionedTopicMetadata request for
+ // the given topic, returns the CommandPartitionedTopicMetadataResponse
as the result.
+ GetPartitionedTopicMetadata(topic string)
(*pb.CommandPartitionedTopicMetadataResponse, error)
}
type lookupService struct {
@@ -150,3 +154,25 @@ func (ls *lookupService) Lookup(topic string)
(*LookupResult, error) {
return nil, errors.New("exceeded max number of redirection during topic
lookup")
}
+
+func (ls *lookupService) GetPartitionedTopicMetadata(topic string)
(*pb.CommandPartitionedTopicMetadataResponse,
+ error) {
+ ls.metrics.PartitionedTopicMetadataRequestsCount.Inc()
+ topicName, err := ParseTopicName(topic)
+ if err != nil {
+ return nil, err
+ }
+
+ id := ls.rpcClient.NewRequestID()
+ res, err := ls.rpcClient.RequestToAnyBroker(id,
pb.BaseCommand_PARTITIONED_METADATA,
+ &pb.CommandPartitionedTopicMetadata{
+ RequestId: &id,
+ Topic: &topicName.Name,
+ })
+ if err != nil {
+ return nil, err
+ }
+ ls.log.Debugf("Got topic{%s} partitioned metadata response: %+v",
topic, res)
+
+ return res.Response.PartitionMetadataResponse, nil
+}
diff --git a/pulsar/internal/lookup_service_test.go
b/pulsar/internal/lookup_service_test.go
index 89e4475..ad0e1bd 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -28,7 +28,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar/log"
)
-type mockedRPCClient struct {
+type mockedLookupRPCClient struct {
requestIDGenerator uint64
t *testing.T
@@ -38,20 +38,20 @@ type mockedRPCClient struct {
}
// Create a new unique request id
-func (c *mockedRPCClient) NewRequestID() uint64 {
+func (c *mockedLookupRPCClient) NewRequestID() uint64 {
c.requestIDGenerator++
return c.requestIDGenerator
}
-func (c *mockedRPCClient) NewProducerID() uint64 {
+func (c *mockedLookupRPCClient) NewProducerID() uint64 {
return 1
}
-func (c *mockedRPCClient) NewConsumerID() uint64 {
+func (c *mockedLookupRPCClient) NewConsumerID() uint64 {
return 1
}
-func (c *mockedRPCClient) RequestToAnyBroker(requestID uint64, cmdType
pb.BaseCommand_Type,
+func (c *mockedLookupRPCClient) RequestToAnyBroker(requestID uint64, cmdType
pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP)
@@ -71,7 +71,7 @@ func (c *mockedRPCClient) RequestToAnyBroker(requestID
uint64, cmdType pb.BaseCo
}, nil
}
-func (c *mockedRPCClient) Request(logicalAddr *url.URL, physicalAddr *url.URL,
requestID uint64,
+func (c *mockedLookupRPCClient) Request(logicalAddr *url.URL, physicalAddr
*url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
{
assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP)
expectedRequest := &c.expectedRequests[0]
@@ -93,13 +93,14 @@ func (c *mockedRPCClient) Request(logicalAddr *url.URL,
physicalAddr *url.URL, r
}, nil
}
-func (c *mockedRPCClient) RequestOnCnx(cnx Connection, requestID uint64,
cmdType pb.BaseCommand_Type,
+func (c *mockedLookupRPCClient) RequestOnCnx(cnx Connection, requestID uint64,
cmdType pb.BaseCommand_Type,
message proto.Message) (*RPCResult, error) {
assert.Fail(c.t, "Shouldn't be called")
return nil, nil
}
-func (c *mockedRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType
pb.BaseCommand_Type, message proto.Message) error {
+func (c *mockedLookupRPCClient) RequestOnCnxNoWait(cnx Connection, cmdType
pb.BaseCommand_Type,
+ message proto.Message) error {
assert.Fail(c.t, "Shouldn't be called")
return nil
}
@@ -112,7 +113,7 @@ func TestLookupSuccess(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
- ls := NewLookupService(&mockedRPCClient{
+ ls := NewLookupService(&mockedLookupRPCClient{
t: t,
expectedRequests: []pb.CommandLookupTopic{
@@ -144,7 +145,7 @@ func TestTlsLookupSuccess(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)
- ls := NewLookupService(&mockedRPCClient{
+ ls := NewLookupService(&mockedLookupRPCClient{
t: t,
expectedRequests: []pb.CommandLookupTopic{
@@ -176,7 +177,7 @@ func TestLookupWithProxy(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
- ls := NewLookupService(&mockedRPCClient{
+ ls := NewLookupService(&mockedLookupRPCClient{
t: t,
expectedRequests: []pb.CommandLookupTopic{
@@ -209,7 +210,7 @@ func TestTlsLookupWithProxy(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)
- ls := NewLookupService(&mockedRPCClient{
+ ls := NewLookupService(&mockedLookupRPCClient{
t: t,
expectedRequests: []pb.CommandLookupTopic{
@@ -242,7 +243,7 @@ func TestLookupWithRedirect(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
- ls := NewLookupService(&mockedRPCClient{
+ ls := NewLookupService(&mockedLookupRPCClient{
t: t,
expectedURL: "pulsar://broker-2:6650",
@@ -286,7 +287,7 @@ func TestTlsLookupWithRedirect(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)
- ls := NewLookupService(&mockedRPCClient{
+ ls := NewLookupService(&mockedLookupRPCClient{
t: t,
expectedURL: "pulsar+ssl://broker-2:6651",
@@ -330,7 +331,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
- ls := NewLookupService(&mockedRPCClient{
+ ls := NewLookupService(&mockedLookupRPCClient{
t: t,
expectedRequests: []pb.CommandLookupTopic{
@@ -360,7 +361,7 @@ func TestLookupWithLookupFailure(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
- ls := NewLookupService(&mockedRPCClient{
+ ls := NewLookupService(&mockedLookupRPCClient{
t: t,
expectedRequests: []pb.CommandLookupTopic{
@@ -383,3 +384,90 @@ func TestLookupWithLookupFailure(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, lr)
}
+
+type mockedPartitionedTopicMetadataRPCClient struct {
+ requestIDGenerator uint64
+ t *testing.T
+
+ expectedRequests []pb.CommandPartitionedTopicMetadata
+ mockedResponses []pb.CommandPartitionedTopicMetadataResponse
+}
+
+func (m mockedPartitionedTopicMetadataRPCClient) NewRequestID() uint64 {
+ m.requestIDGenerator++
+ return m.requestIDGenerator
+}
+
+func (m mockedPartitionedTopicMetadataRPCClient) NewProducerID() uint64 {
+ return 1
+}
+
+func (m mockedPartitionedTopicMetadataRPCClient) NewConsumerID() uint64 {
+ return 1
+}
+
+func (m mockedPartitionedTopicMetadataRPCClient) RequestToAnyBroker(requestID
uint64, cmdType pb.BaseCommand_Type,
+ message proto.Message) (*RPCResult, error) {
+ assert.Equal(m.t, cmdType, pb.BaseCommand_PARTITIONED_METADATA)
+
+ expectedRequest := &m.expectedRequests[0]
+ m.expectedRequests = m.expectedRequests[1:]
+
+ assert.Equal(m.t, *expectedRequest.RequestId, requestID)
+
+ mockedResponse := &m.mockedResponses[0]
+ m.mockedResponses = m.mockedResponses[1:]
+
+ return &RPCResult{
+ &pb.BaseCommand{
+ PartitionMetadataResponse: mockedResponse,
+ },
+ nil,
+ }, nil
+}
+
+func (m mockedPartitionedTopicMetadataRPCClient) Request(logicalAddr *url.URL,
physicalAddr *url.URL, requestID uint64,
+ cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
{
+ assert.Fail(m.t, "Shouldn't be called")
+ return nil, nil
+}
+
+func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnxNoWait(cnx
Connection, cmdType pb.BaseCommand_Type,
+ message proto.Message) error {
+ assert.Fail(m.t, "Shouldn't be called")
+ return nil
+}
+
+func (m mockedPartitionedTopicMetadataRPCClient) RequestOnCnx(cnx Connection,
requestID uint64,
+ cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
{
+ assert.Fail(m.t, "Shouldn't be called")
+ return nil, nil
+}
+
+func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
+ url, err := url.Parse("pulsar://example:6650")
+ assert.NoError(t, err)
+
+ ls := NewLookupService(&mockedPartitionedTopicMetadataRPCClient{
+ t: t,
+
+ expectedRequests: []pb.CommandPartitionedTopicMetadata{
+ {
+ RequestId: proto.Uint64(1),
+ Topic: proto.String("my-topic"),
+ },
+ },
+ mockedResponses: []pb.CommandPartitionedTopicMetadataResponse{
+ {
+ RequestId: proto.Uint64(1),
+ Partitions: proto.Uint32(1),
+ Response:
pb.CommandPartitionedTopicMetadataResponse_Success.Enum(),
+ },
+ },
+ }, url, false, log.DefaultNopLogger(),
NewMetricsProvider(map[string]string{}))
+
+ metadata, err := ls.GetPartitionedTopicMetadata("my-topic")
+ assert.NoError(t, err)
+ assert.NotNil(t, metadata)
+ assert.Equal(t, metadata.GetPartitions(), uint32(1))
+}
diff --git a/pulsar/internal/metrics.go b/pulsar/internal/metrics.go
index 875587c..d44522d 100644
--- a/pulsar/internal/metrics.go
+++ b/pulsar/internal/metrics.go
@@ -51,12 +51,13 @@ type Metrics struct {
readersClosed *prometheus.CounterVec
// Metrics that are not labeled with topic, are immediately available
- ConnectionsOpened prometheus.Counter
- ConnectionsClosed prometheus.Counter
- ConnectionsEstablishmentErrors prometheus.Counter
- ConnectionsHandshakeErrors prometheus.Counter
- LookupRequestsCount prometheus.Counter
- RPCRequestCount prometheus.Counter
+ ConnectionsOpened prometheus.Counter
+ ConnectionsClosed prometheus.Counter
+ ConnectionsEstablishmentErrors prometheus.Counter
+ ConnectionsHandshakeErrors prometheus.Counter
+ LookupRequestsCount prometheus.Counter
+ PartitionedTopicMetadataRequestsCount prometheus.Counter
+ RPCRequestCount prometheus.Counter
}
type TopicMetrics struct {
@@ -268,6 +269,12 @@ func NewMetricsProvider(userDefinedLabels
map[string]string) *Metrics {
ConstLabels: constLabels,
}),
+ PartitionedTopicMetadataRequestsCount:
prometheus.NewCounter(prometheus.CounterOpts{
+ Name:
"pulsar_client_partitioned_topic_metadata_count",
+ Help: "Counter of partitioned_topic_metadata
requests made by the client",
+ ConstLabels: constLabels,
+ }),
+
RPCRequestCount: prometheus.NewCounter(prometheus.CounterOpts{
Name: "pulsar_client_rpc_count",
Help: "Counter of RPC requests made by the
client",
@@ -306,6 +313,7 @@ func NewMetricsProvider(userDefinedLabels
map[string]string) *Metrics {
prometheus.DefaultRegisterer.Register(metrics.ConnectionsEstablishmentErrors)
prometheus.DefaultRegisterer.Register(metrics.ConnectionsHandshakeErrors)
prometheus.DefaultRegisterer.Register(metrics.LookupRequestsCount)
+
prometheus.DefaultRegisterer.Register(metrics.PartitionedTopicMetadataRequestsCount)
prometheus.DefaultRegisterer.Register(metrics.RPCRequestCount)
return metrics
}