This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fb7f55  [Issue 177] add multiple hosts support (#484)
5fb7f55 is described below

commit 5fb7f55939e6f4ec28126339b22a6b4018c0f2fa
Author: Rui Fu <[email protected]>
AuthorDate: Tue Mar 9 12:50:59 2021 +0800

    [Issue 177] add multiple hosts support (#484)
    
    Fixes #177
    
    Master Issue https://github.com/apache/pulsar/issues/3218
    
    ### Motivation
    
    add multiple hosts support to go client
    
    ### Modifications
    
    - add service uri & service name resolver
    - add service name resolver to lookup service & rpc client
    - add unit tests
    - add integration tests
    
    ### Verifying this change
    
    - [ ] Make sure that the change passes the CI checks.
---
 go.sum                                        |   1 +
 pulsar/client_impl.go                         |   6 +-
 pulsar/client_impl_test.go                    |  56 +++++++
 pulsar/internal/connection_pool.go            |   4 +
 pulsar/internal/lookup_service.go             |  27 ++--
 pulsar/internal/lookup_service_test.go        |  55 +++++--
 pulsar/internal/rpc_client.go                 |  35 +++--
 pulsar/internal/service_name_resolver.go      | 116 ++++++++++++++
 pulsar/internal/service_name_resolver_test.go | 131 ++++++++++++++++
 pulsar/internal/service_uri.go                | 211 ++++++++++++++++++++++++++
 pulsar/internal/service_uri_test.go           | 171 +++++++++++++++++++++
 pulsar/producer_test.go                       |   2 +-
 pulsar/reader_test.go                         |  52 +++++++
 13 files changed, 829 insertions(+), 38 deletions(-)

diff --git a/go.sum b/go.sum
index 7f61837..a14857b 100644
--- a/go.sum
+++ b/go.sum
@@ -27,6 +27,7 @@ github.com/danieljoos/wincred v1.0.2/go.mod 
h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3E
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 
h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU=
 github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1/go.mod 
h1:+hnT3ywWDTAFrW5aE+u2Sa/wT555ZqwoCS+pk3p6ry4=
 github.com/dimfeld/httptreemux v5.0.1+incompatible 
h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
 github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod 
h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 5882dd4..ee8ff50 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -123,8 +123,10 @@ func newClient(options ClientOptions) (Client, error) {
                log:     logger,
                metrics: metrics,
        }
-       c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout, 
logger, metrics)
-       c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig 
!= nil, logger, metrics)
+       serviceNameResolver := internal.NewPulsarServiceNameResolver(url)
+
+       c.rpcClient = internal.NewRPCClient(url, serviceNameResolver, 
c.cnxPool, operationTimeout, logger, metrics)
+       c.lookupService = internal.NewLookupService(c.rpcClient, url, 
serviceNameResolver, tlsConfig != nil, logger, metrics)
        c.handlers = internal.NewClientHandlers()
 
        return c, nil
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index 661eac2..65732ea 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -18,6 +18,7 @@
 package pulsar
 
 import (
+       "context"
        "crypto/tls"
        "fmt"
        "io/ioutil"
@@ -458,3 +459,58 @@ func anonymousNamespacePolicy() map[string]interface{} {
                "replication_clusters": []string{"standalone"},
        }
 }
+
+func TestRetryWithMultipleHosts(t *testing.T) {
+       // Multi hosts included an unreached port and the actual port for 
verify retry logic
+       client, err := NewClient(ClientOptions{
+               URL: "pulsar://localhost:6600,localhost:6650",
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := "persistent://public/default/retry-multiple-hosts-" + 
generateRandomName()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+       })
+
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       ctx := context.Background()
+       var msgIDs [][]byte
+
+       for i := 0; i < 10; i++ {
+               if msgID, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       assert.Nil(t, err)
+               } else {
+                       assert.NotNil(t, msgID)
+                       msgIDs = append(msgIDs, msgID.Serialize())
+               }
+       }
+
+       assert.Equal(t, 10, len(msgIDs))
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:                       topic,
+               SubscriptionName:            "retry-multi-hosts-sub",
+               Type:                        Shared,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               assert.Nil(t, err)
+               assert.Contains(t, msgIDs, msg.ID().Serialize())
+               consumer.Ack(msg)
+       }
+
+       err = consumer.Unsubscribe()
+       assert.Nil(t, err)
+
+}
diff --git a/pulsar/internal/connection_pool.go 
b/pulsar/internal/connection_pool.go
index 54d30b3..29e1267 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -103,6 +103,10 @@ func (p *connectionPool) GetConnection(logicalAddr 
*url.URL, physicalAddr *url.U
        }
 
        if err := cnx.waitUntilReady(); err != nil {
+               if !wasCached {
+                       p.pool.Delete(key)
+                       p.log.Debug("Removed failed connection from pool:", 
cnx.logicalAddr, cnx.physicalAddr)
+               }
                return nil, err
        }
        return cnx, nil
diff --git a/pulsar/internal/lookup_service.go 
b/pulsar/internal/lookup_service.go
index 65fa383..5b4e2ac 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -45,22 +45,22 @@ type LookupService interface {
 }
 
 type lookupService struct {
-       rpcClient  RPCClient
-       serviceURL *url.URL
-       tlsEnabled bool
-       log        log.Logger
-       metrics    *Metrics
+       rpcClient           RPCClient
+       serviceNameResolver ServiceNameResolver
+       tlsEnabled          bool
+       log                 log.Logger
+       metrics             *Metrics
 }
 
 // NewLookupService init a lookup service struct and return an object of 
LookupService.
-func NewLookupService(rpcClient RPCClient, serviceURL *url.URL,
+func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, 
serviceNameResolver ServiceNameResolver,
        tlsEnabled bool, logger log.Logger, metrics *Metrics) LookupService {
        return &lookupService{
-               rpcClient:  rpcClient,
-               serviceURL: serviceURL,
-               tlsEnabled: tlsEnabled,
-               log:        logger.SubLogger(log.Fields{"serviceURL": 
serviceURL}),
-               metrics:    metrics,
+               rpcClient:           rpcClient,
+               serviceNameResolver: serviceNameResolver,
+               tlsEnabled:          tlsEnabled,
+               log:                 logger.SubLogger(log.Fields{"serviceURL": 
serviceURL}),
+               metrics:             metrics,
        }
 }
 
@@ -78,7 +78,10 @@ func (ls *lookupService) getBrokerAddress(lr 
*pb.CommandLookupTopicResponse) (lo
 
        var physicalAddr *url.URL
        if lr.GetProxyThroughServiceUrl() {
-               physicalAddr = ls.serviceURL
+               physicalAddr, err = ls.serviceNameResolver.ResolveHost()
+               if err != nil {
+                       return nil, nil, err
+               }
        } else {
                physicalAddr = logicalAddress
        }
diff --git a/pulsar/internal/lookup_service_test.go 
b/pulsar/internal/lookup_service_test.go
index ad0e1bd..a3f9bfe 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -112,6 +112,7 @@ func responseType(r 
pb.CommandLookupTopicResponse_LookupType) *pb.CommandLookupT
 func TestLookupSuccess(t *testing.T) {
        url, err := url.Parse("pulsar://example:6650")
        assert.NoError(t, err)
+       serviceNameResolver := NewPulsarServiceNameResolver(url)
 
        ls := NewLookupService(&mockedLookupRPCClient{
                t: t,
@@ -131,7 +132,7 @@ func TestLookupSuccess(t *testing.T) {
                                BrokerServiceUrl: 
proto.String("pulsar://broker-1:6650"),
                        },
                },
-       }, url, false, log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
+       }, url, serviceNameResolver, false, log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -144,6 +145,7 @@ func TestLookupSuccess(t *testing.T) {
 func TestTlsLookupSuccess(t *testing.T) {
        url, err := url.Parse("pulsar+ssl://example:6651")
        assert.NoError(t, err)
+       serviceNameResolver := NewPulsarServiceNameResolver(url)
 
        ls := NewLookupService(&mockedLookupRPCClient{
                t: t,
@@ -163,7 +165,7 @@ func TestTlsLookupSuccess(t *testing.T) {
                                BrokerServiceUrlTls: 
proto.String("pulsar+ssl://broker-1:6651"),
                        },
                },
-       }, url, true, log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
+       }, url, serviceNameResolver, true, log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -176,6 +178,7 @@ func TestTlsLookupSuccess(t *testing.T) {
 func TestLookupWithProxy(t *testing.T) {
        url, err := url.Parse("pulsar://example:6650")
        assert.NoError(t, err)
+       serviceNameResolver := NewPulsarServiceNameResolver(url)
 
        ls := NewLookupService(&mockedLookupRPCClient{
                t: t,
@@ -196,7 +199,7 @@ func TestLookupWithProxy(t *testing.T) {
                                ProxyThroughServiceUrl: proto.Bool(true),
                        },
                },
-       }, url, false, log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
+       }, url, serviceNameResolver, false, log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -229,7 +232,7 @@ func TestTlsLookupWithProxy(t *testing.T) {
                                ProxyThroughServiceUrl: proto.Bool(true),
                        },
                },
-       }, url, true, log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
+       }, url, NewPulsarServiceNameResolver(url), true, 
log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -273,7 +276,7 @@ func TestLookupWithRedirect(t *testing.T) {
                                BrokerServiceUrl: 
proto.String("pulsar://broker-1:6650"),
                        },
                },
-       }, url, false, log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
+       }, url, NewPulsarServiceNameResolver(url), false, 
log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -317,7 +320,7 @@ func TestTlsLookupWithRedirect(t *testing.T) {
                                BrokerServiceUrlTls: 
proto.String("pulsar+ssl://broker-1:6651"),
                        },
                },
-       }, url, true, log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
+       }, url, NewPulsarServiceNameResolver(url), true, 
log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -350,7 +353,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
                                ProxyThroughServiceUrl: proto.Bool(false),
                        },
                },
-       }, url, false, log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
+       }, url, NewPulsarServiceNameResolver(url), false, 
log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
 
        lr, err := ls.Lookup("my-topic")
        assert.Error(t, err)
@@ -378,7 +381,7 @@ func TestLookupWithLookupFailure(t *testing.T) {
                                Authoritative: proto.Bool(true),
                        },
                },
-       }, url, false, log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
+       }, url, NewPulsarServiceNameResolver(url), false, 
log.DefaultNopLogger(), NewMetricsProvider(map[string]string{}))
 
        lr, err := ls.Lookup("my-topic")
        assert.Error(t, err)
@@ -447,6 +450,7 @@ func (m mockedPartitionedTopicMetadataRPCClient) 
RequestOnCnx(cnx Connection, re
 func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
        url, err := url.Parse("pulsar://example:6650")
        assert.NoError(t, err)
+       serviceNameResolver := NewPulsarServiceNameResolver(url)
 
        ls := NewLookupService(&mockedPartitionedTopicMetadataRPCClient{
                t: t,
@@ -464,10 +468,43 @@ func TestGetPartitionedTopicMetadataSuccess(t *testing.T) 
{
                                Response:   
pb.CommandPartitionedTopicMetadataResponse_Success.Enum(),
                        },
                },
-       }, url, false, log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
+       }, url, serviceNameResolver, false, log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
 
        metadata, err := ls.GetPartitionedTopicMetadata("my-topic")
        assert.NoError(t, err)
        assert.NotNil(t, metadata)
        assert.Equal(t, metadata.GetPartitions(), uint32(1))
 }
+
+func TestLookupSuccessWithMultipleHosts(t *testing.T) {
+       url, err := url.Parse("pulsar://host1,host2,host3:6650")
+       assert.NoError(t, err)
+       serviceNameResolver := NewPulsarServiceNameResolver(url)
+
+       ls := NewLookupService(&mockedLookupRPCClient{
+               t: t,
+
+               expectedRequests: []pb.CommandLookupTopic{
+                       {
+                               RequestId:     proto.Uint64(1),
+                               Topic:         proto.String("my-topic"),
+                               Authoritative: proto.Bool(false),
+                       },
+               },
+               mockedResponses: []pb.CommandLookupTopicResponse{
+                       {
+                               RequestId:        proto.Uint64(1),
+                               Response:         
responseType(pb.CommandLookupTopicResponse_Connect),
+                               Authoritative:    proto.Bool(true),
+                               BrokerServiceUrl: 
proto.String("pulsar://broker-1:6650"),
+                       },
+               },
+       }, url, serviceNameResolver, false, log.DefaultNopLogger(), 
NewMetricsProvider(map[string]string{}))
+
+       lr, err := ls.Lookup("my-topic")
+       assert.NoError(t, err)
+       assert.NotNil(t, lr)
+
+       assert.Equal(t, "pulsar://broker-1:6650", lr.LogicalAddr.String())
+       assert.Equal(t, "pulsar://broker-1:6650", lr.PhysicalAddr.String())
+}
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 0bb482c..b51e4e3 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -56,7 +56,7 @@ type RPCClient interface {
 }
 
 type rpcClient struct {
-       serviceURL          *url.URL
+       serviceNameResolver ServiceNameResolver
        pool                ConnectionPool
        requestTimeout      time.Duration
        requestIDGenerator  uint64
@@ -66,22 +66,26 @@ type rpcClient struct {
        metrics             *Metrics
 }
 
-func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
+func NewRPCClient(serviceURL *url.URL, serviceNameResolver 
ServiceNameResolver, pool ConnectionPool,
        requestTimeout time.Duration, logger log.Logger, metrics *Metrics) 
RPCClient {
        return &rpcClient{
-               serviceURL:     serviceURL,
-               pool:           pool,
-               requestTimeout: requestTimeout,
-               log:            logger.SubLogger(log.Fields{"serviceURL": 
serviceURL}),
-               metrics:        metrics,
+               serviceNameResolver: serviceNameResolver,
+               pool:                pool,
+               requestTimeout:      requestTimeout,
+               log:                 logger.SubLogger(log.Fields{"serviceURL": 
serviceURL}),
+               metrics:             metrics,
        }
 }
 
 func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType 
pb.BaseCommand_Type,
        message proto.Message) (*RPCResult, error) {
-
-       rpcResult, err := c.Request(c.serviceURL, c.serviceURL, requestID, 
cmdType, message)
-       if _, ok := err.(net.Error); ok {
+       host, err := c.serviceNameResolver.ResolveHost()
+       if err != nil {
+               c.log.Errorf("request host resolve failed with error: {%v}", 
err)
+               return nil, err
+       }
+       rpcResult, err := c.Request(host, host, requestID, cmdType, message)
+       if _, ok := err.(net.Error); ok || (err != nil && err.Error() == 
"connection error") {
                // We can retry this kind of requests over a connection error 
because they're
                // not specific to a particular broker.
                backoff := Backoff{100 * time.Millisecond}
@@ -92,9 +96,13 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, 
cmdType pb.BaseCommand_
                        retryTime = backoff.Next()
                        c.log.Debugf("Retrying request in {%v} with timeout in 
{%v}", retryTime, c.requestTimeout)
                        time.Sleep(retryTime)
-
-                       rpcResult, err = c.Request(c.serviceURL, c.serviceURL, 
requestID, cmdType, message)
-                       if _, ok := err.(net.Error); ok {
+                       host, err = c.serviceNameResolver.ResolveHost()
+                       if err != nil {
+                               c.log.Errorf("Retrying request host resolve 
failed with error: {%v}", err)
+                               continue
+                       }
+                       rpcResult, err = c.Request(host, host, requestID, 
cmdType, message)
+                       if _, ok := err.(net.Error); ok || (err != nil && 
err.Error() == "connection error") {
                                continue
                        } else {
                                // We either succeeded or encountered a non 
connection error
@@ -102,7 +110,6 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, 
cmdType pb.BaseCommand_
                        }
                }
        }
-
        return rpcResult, err
 }
 
diff --git a/pulsar/internal/service_name_resolver.go 
b/pulsar/internal/service_name_resolver.go
new file mode 100644
index 0000000..3b1209c
--- /dev/null
+++ b/pulsar/internal/service_name_resolver.go
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+       "errors"
+       "fmt"
+       "math/rand"
+       "net/url"
+       "sync/atomic"
+       "time"
+
+       log "github.com/sirupsen/logrus"
+)
+
+type ServiceNameResolver interface {
+       ResolveHost() (*url.URL, error)
+       ResolveHostURI() (*PulsarServiceURI, error)
+       UpdateServiceURL(url *url.URL) error
+       GetServiceURI() *PulsarServiceURI
+       GetServiceURL() *url.URL
+       GetAddressList() []*url.URL
+}
+
+type pulsarServiceNameResolver struct {
+       ServiceURI   *PulsarServiceURI
+       ServiceURL   *url.URL
+       CurrentIndex int32
+       AddressList  []*url.URL
+}
+
+func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver {
+       r := &pulsarServiceNameResolver{}
+       err := r.UpdateServiceURL(url)
+       if err != nil {
+               log.Errorf("create pulsar service name resolver failed : %v", 
err)
+       }
+       return r
+}
+
+func (r *pulsarServiceNameResolver) ResolveHost() (*url.URL, error) {
+       if r.AddressList == nil {
+               return nil, errors.New("no service url is provided yet")
+       }
+       if len(r.AddressList) == 0 {
+               return nil, fmt.Errorf("no hosts found for service url : %v", 
r.ServiceURL)
+       }
+       if len(r.AddressList) == 1 {
+               return r.AddressList[0], nil
+       }
+       idx := (r.CurrentIndex + 1) % int32(len(r.AddressList))
+       atomic.StoreInt32(&r.CurrentIndex, idx)
+       return r.AddressList[idx], nil
+}
+
+func (r *pulsarServiceNameResolver) ResolveHostURI() (*PulsarServiceURI, 
error) {
+       host, err := r.ResolveHost()
+       if err != nil {
+               return nil, err
+       }
+       hostURL := host.Scheme + "://" + host.Hostname() + ":" + host.Port()
+       return NewPulsarServiceURIFromURIString(hostURL)
+}
+
+func (r *pulsarServiceNameResolver) UpdateServiceURL(u *url.URL) error {
+       uri, err := NewPulsarServiceURIFromURL(u)
+       if err != nil {
+               log.Errorf("invalid service-url instance %s provided %v", u, 
err)
+               return err
+       }
+
+       hosts := uri.ServiceHosts
+       addresses := []*url.URL{}
+       for _, host := range hosts {
+               hostURL := uri.URL.Scheme + "://" + host
+               u, err := url.Parse(hostURL)
+               if err != nil {
+                       log.Errorf("invalid host-url %s provided %v", hostURL, 
err)
+                       return err
+               }
+               addresses = append(addresses, u)
+       }
+       r.AddressList = addresses
+       r.ServiceURL = u
+       r.ServiceURI = uri
+       rand.Seed(time.Now().Unix()) // initialize global pseudo random 
generator
+       atomic.StoreInt32(&r.CurrentIndex, int32(rand.Intn(len(addresses))))
+       return nil
+}
+
+func (r *pulsarServiceNameResolver) GetServiceURI() *PulsarServiceURI {
+       return r.ServiceURI
+}
+
+func (r *pulsarServiceNameResolver) GetServiceURL() *url.URL {
+       return r.ServiceURL
+}
+
+func (r *pulsarServiceNameResolver) GetAddressList() []*url.URL {
+       return r.AddressList
+}
diff --git a/pulsar/internal/service_name_resolver_test.go 
b/pulsar/internal/service_name_resolver_test.go
new file mode 100644
index 0000000..e190633
--- /dev/null
+++ b/pulsar/internal/service_name_resolver_test.go
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+       "net/url"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestResolveBeforeUpdateServiceUrl(t *testing.T) {
+       resolver := NewPulsarServiceNameResolver(nil)
+       u, err := resolver.ResolveHost()
+       assert.Nil(t, u)
+       assert.NotNil(t, err)
+       assert.EqualError(t, err, "no service url is provided yet")
+}
+
+func TestResolveUriBeforeUpdateServiceUrl(t *testing.T) {
+       resolver := NewPulsarServiceNameResolver(nil)
+       u, err := resolver.ResolveHostURI()
+       assert.Nil(t, u)
+       assert.NotNil(t, err)
+       assert.EqualError(t, err, "no service url is provided yet")
+}
+
+func TestUpdateInvalidServiceUrl(t *testing.T) {
+       resolver := NewPulsarServiceNameResolver(nil)
+       url, _ := url.Parse("pulsar:///")
+       err := resolver.UpdateServiceURL(url)
+       assert.NotNil(t, err)
+       assert.Empty(t, resolver.GetServiceURL())
+       assert.Nil(t, resolver.GetServiceURI())
+}
+
+func TestSimpleHostUrl(t *testing.T) {
+       resolver := NewPulsarServiceNameResolver(nil)
+       serviceURL, _ := url.Parse("pulsar://host1:6650")
+       err := resolver.UpdateServiceURL(serviceURL)
+       assert.Nil(t, err)
+       assert.Equal(t, serviceURL, resolver.GetServiceURL())
+       expectedURI, err := NewPulsarServiceURIFromURL(serviceURL)
+       assert.Nil(t, err)
+       assert.Equal(t, expectedURI, resolver.GetServiceURI())
+       actualHost, err := resolver.ResolveHost()
+       assert.Nil(t, err)
+       assert.Equal(t, "host1", actualHost.Hostname())
+       assert.Equal(t, "6650", actualHost.Port())
+
+       newServiceURL, _ := url.Parse("pulsar://host2:6650")
+       err = resolver.UpdateServiceURL(newServiceURL)
+       assert.Nil(t, err)
+       assert.Equal(t, newServiceURL, resolver.GetServiceURL())
+       expectedURI, err = NewPulsarServiceURIFromURL(newServiceURL)
+       assert.Nil(t, err)
+       assert.Equal(t, expectedURI, resolver.GetServiceURI())
+       actualHost, err = resolver.ResolveHost()
+       assert.Nil(t, err)
+       assert.Equal(t, "host2", actualHost.Hostname())
+       assert.Equal(t, "6650", actualHost.Port())
+}
+
+func TestMultipleHostsUrl(t *testing.T) {
+       resolver := NewPulsarServiceNameResolver(nil)
+       serviceURL, _ := url.Parse("pulsar://host1:6650,host2:6650")
+       err := resolver.UpdateServiceURL(serviceURL)
+       assert.Nil(t, err)
+       assert.Equal(t, serviceURL, resolver.GetServiceURL())
+       expectedURI, err := NewPulsarServiceURIFromURL(serviceURL)
+       assert.Nil(t, err)
+       assert.Equal(t, expectedURI, resolver.GetServiceURI())
+       host1, _ := url.Parse("pulsar://host1:6650")
+       host2, _ := url.Parse("pulsar://host2:6650")
+       host1uri, _ := NewPulsarServiceURIFromURIString("pulsar://host1:6650")
+       host2uri, _ := NewPulsarServiceURIFromURIString("pulsar://host2:6650")
+       assert.Contains(t, resolver.GetAddressList(), host1)
+       assert.Contains(t, resolver.GetAddressList(), host2)
+       hosts := []*url.URL{host1, host2}
+       hosturis := []*PulsarServiceURI{host1uri, host2uri}
+       for i := 0; i < 10; i++ {
+               host, err := resolver.ResolveHost()
+               assert.Nil(t, err)
+               hosturi, err := resolver.ResolveHostURI()
+               assert.Nil(t, err)
+               assert.Contains(t, hosts, host)
+               assert.Contains(t, hosturis, hosturi)
+       }
+}
+
+func TestMultipleHostsTlsUrl(t *testing.T) {
+       resolver := NewPulsarServiceNameResolver(nil)
+       serviceURL, _ := url.Parse("pulsar+ssl://host1:6651,host2:6651")
+       err := resolver.UpdateServiceURL(serviceURL)
+       assert.Nil(t, err)
+       assert.Equal(t, serviceURL, resolver.GetServiceURL())
+       expectedURI, err := NewPulsarServiceURIFromURL(serviceURL)
+       assert.Nil(t, err)
+       assert.Equal(t, expectedURI, resolver.GetServiceURI())
+       host1, _ := url.Parse("pulsar+ssl://host1:6651")
+       host2, _ := url.Parse("pulsar+ssl://host2:6651")
+       host1uri, _ := 
NewPulsarServiceURIFromURIString("pulsar+ssl://host1:6651")
+       host2uri, _ := 
NewPulsarServiceURIFromURIString("pulsar+ssl://host2:6651")
+       assert.Contains(t, resolver.GetAddressList(), host1)
+       assert.Contains(t, resolver.GetAddressList(), host2)
+       hosts := []*url.URL{host1, host2}
+       hosturis := []*PulsarServiceURI{host1uri, host2uri}
+       for i := 0; i < 10; i++ {
+               host, err := resolver.ResolveHost()
+               assert.Nil(t, err)
+               hosturi, err := resolver.ResolveHostURI()
+               assert.Nil(t, err)
+               assert.Contains(t, hosts, host)
+               assert.Contains(t, hosturis, hosturi)
+       }
+}
diff --git a/pulsar/internal/service_uri.go b/pulsar/internal/service_uri.go
new file mode 100644
index 0000000..7b75339
--- /dev/null
+++ b/pulsar/internal/service_uri.go
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+       "errors"
+       "fmt"
+       "net"
+       "net/url"
+       "strings"
+
+       log "github.com/sirupsen/logrus"
+)
+
+const (
+       BinaryService = "pulsar"
+       HTTPService   = "http"
+       HTTPSService  = "https"
+       SSLService    = "ssl"
+       BinaryPort    = 6650
+       BinaryTLSPort = 6651
+       HTTPPort      = 80
+       HTTPSPort     = 443
+)
+
+type PulsarServiceURI struct {
+       ServiceName  string
+       ServiceInfos []string
+       ServiceHosts []string
+       servicePath  string
+       URL          *url.URL
+}
+
+func NewPulsarServiceURIFromURIString(uri string) (*PulsarServiceURI, error) {
+       u, err := fromString(uri)
+       if err != nil {
+               log.Error(err)
+               return nil, err
+       }
+       return u, nil
+}
+
+func NewPulsarServiceURIFromURL(url *url.URL) (*PulsarServiceURI, error) {
+       u, err := fromURL(url)
+       if err != nil {
+               log.Error(err)
+               return nil, err
+       }
+       return u, nil
+}
+
+func fromString(uriStr string) (*PulsarServiceURI, error) {
+       if uriStr == "" || len(uriStr) == 0 {
+               return nil, errors.New("service uriStr string is null")
+       }
+       if strings.Contains(uriStr, "[") && strings.Contains(uriStr, "]") {
+               // deal with ipv6 address
+               hosts := strings.FieldsFunc(uriStr, splitURI)
+               if len(hosts) > 1 {
+                       // deal with ipv6 address
+                       firstHost := hosts[0]
+                       lastHost := hosts[len(hosts)-1]
+                       hasPath := strings.Contains(lastHost, "/")
+                       path := ""
+                       if hasPath {
+                               idx := strings.Index(lastHost, "/")
+                               path = lastHost[idx:]
+                       }
+                       firstHost += path
+                       url, err := url.Parse(firstHost)
+                       if err != nil {
+                               return nil, err
+                       }
+                       serviceURI, err := fromURL(url)
+                       if err != nil {
+                               return nil, err
+                       }
+                       var mHosts []string
+                       var multiHosts []string
+                       mHosts = append(mHosts, serviceURI.ServiceHosts[0])
+                       mHosts = append(mHosts, hosts[1:]...)
+
+                       for _, v := range mHosts {
+                               h, err := 
validateHostName(serviceURI.ServiceName, serviceURI.ServiceInfos, v)
+                               if err == nil {
+                                       multiHosts = append(multiHosts, h)
+                               } else {
+                                       return nil, err
+                               }
+                       }
+
+                       return &PulsarServiceURI{
+                               serviceURI.ServiceName,
+                               serviceURI.ServiceInfos,
+                               multiHosts,
+                               serviceURI.servicePath,
+                               serviceURI.URL,
+                       }, nil
+               }
+       }
+
+       url, err := url.Parse(uriStr)
+       if err != nil {
+               return nil, err
+       }
+
+       return fromURL(url)
+}
+
+func fromURL(url *url.URL) (*PulsarServiceURI, error) {
+       if url == nil {
+               return nil, errors.New("service url instance is null")
+       }
+
+       if url.Host == "" || len(url.Host) == 0 {
+               return nil, errors.New("service host is null")
+       }
+
+       var serviceName string
+       var serviceInfos []string
+       scheme := url.Scheme
+       if scheme != "" {
+               scheme = strings.ToLower(scheme)
+               schemeParts := strings.Split(scheme, "+")
+               serviceName = schemeParts[0]
+               serviceInfos = schemeParts[1:]
+       }
+
+       var serviceHosts []string
+       hosts := strings.FieldsFunc(url.Host, splitURI)
+       for _, v := range hosts {
+               h, err := validateHostName(serviceName, serviceInfos, v)
+               if err == nil {
+                       serviceHosts = append(serviceHosts, h)
+               } else {
+                       return nil, err
+               }
+       }
+
+       return &PulsarServiceURI{
+               serviceName,
+               serviceInfos,
+               serviceHosts,
+               url.Path,
+               url,
+       }, nil
+}
+
+func splitURI(r rune) bool {
+       return r == ',' || r == ';'
+}
+
+func validateHostName(serviceName string, serviceInfos []string, hostname 
string) (string, error) {
+       uri, err := url.Parse("dummyscheme://" + hostname)
+       if err != nil {
+               return "", err
+       }
+       host := uri.Hostname()
+       if strings.Contains(hostname, "[") && strings.Contains(hostname, "]") {
+               host = fmt.Sprintf("[%s]", host)
+       }
+       if host == "" || uri.Scheme == "" {
+               return "", errors.New("Invalid hostname : " + hostname)
+       }
+
+       port := uri.Port()
+       if uri.Port() == "" {
+               p := getServicePort(serviceName, serviceInfos)
+               if p == -1 {
+                       return "", fmt.Errorf("invalid port : %d", p)
+               }
+               port = fmt.Sprint(p)
+       }
+       result := host + ":" + port
+       _, _, err = net.SplitHostPort(result)
+       if err != nil {
+               return "", err
+       }
+       return result, nil
+}
+
+func getServicePort(serviceName string, serviceInfos []string) int {
+       switch strings.ToLower(serviceName) {
+       case BinaryService:
+               if len(serviceInfos) == 0 {
+                       return BinaryPort
+               } else if len(serviceInfos) == 1 && 
strings.ToLower(serviceInfos[0]) == SSLService {
+                       return BinaryTLSPort
+               }
+       case HTTPService:
+               return HTTPPort
+       case HTTPSService:
+               return HTTPSPort
+       }
+       return -1
+}
diff --git a/pulsar/internal/service_uri_test.go 
b/pulsar/internal/service_uri_test.go
new file mode 100644
index 0000000..445b325
--- /dev/null
+++ b/pulsar/internal/service_uri_test.go
@@ -0,0 +1,171 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package internal
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestInvalidServiceUris(t *testing.T) {
+       uris := []string{
+               "://localhost:6650",              // missing scheme
+               "pulsar:///",                     // missing authority
+               "pulsar://localhost:6650:6651/",  // invalid hostname pair
+               "pulsar://localhost:xyz/",        // invalid port
+               "pulsar://localhost:-6650/",      // negative port
+               "pulsar://fec0:0:0:ffff::1:6650", // missing brackets
+       }
+
+       for _, uri := range uris {
+               testInvalidServiceURI(t, uri)
+       }
+}
+
+func TestEmptyServiceUriString(t *testing.T) {
+       u, err := NewPulsarServiceURIFromURIString("")
+       assert.Nil(t, u)
+       assert.NotNil(t, err)
+}
+
+func TestNullServiceUrlInstance(t *testing.T) {
+       u, err := NewPulsarServiceURIFromURL(nil)
+       assert.Nil(t, u)
+       assert.NotNil(t, err)
+}
+
+func TestMissingServiceName(t *testing.T) {
+       serviceURI := "//localhost:6650/path/to/namespace"
+       assertServiceURI(t, serviceURI, "", nil, []string{"localhost:6650"}, 
"/path/to/namespace", "")
+}
+
+func TestEmptyPath(t *testing.T) {
+       serviceURI := "pulsar://localhost:6650"
+       assertServiceURI(t, serviceURI, "pulsar", nil, 
[]string{"localhost:6650"}, "", "")
+}
+
+func TestRootPath(t *testing.T) {
+       serviceURI := "pulsar://localhost:6650/"
+       assertServiceURI(t, serviceURI, "pulsar", nil, 
[]string{"localhost:6650"}, "/", "")
+}
+
+func TestUserInfo(t *testing.T) {
+       serviceURI := "pulsar://pulsaruser@localhost:6650/path/to/namespace"
+       assertServiceURI(t, serviceURI, "pulsar", nil, 
[]string{"localhost:6650"}, "/path/to/namespace", "pulsaruser")
+}
+
+func TestIpv6Uri(t *testing.T) {
+       serviceURI := 
"pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650/path/to/namespace"
+       assertServiceURI(t, serviceURI, "pulsar", nil, 
[]string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace",
+               "pulsaruser")
+}
+
+func TestIpv6UriWithoutPulsarPort(t *testing.T) {
+       serviceURI := "pulsar://[fec0:0:0:ffff::1]/path/to/namespace"
+       assertServiceURI(t, serviceURI, "pulsar", nil, 
[]string{"[fec0:0:0:ffff::1]:6650"}, "/path/to/namespace", "")
+}
+
+func TestMultiIpv6Uri(t *testing.T) {
+       serviceURI := 
"pulsar://pulsaruser@[fec0:0:0:ffff::1]:6650,[fec0:0:0:ffff::2]:6650;[fec0:0:0:ffff::3]:6650"
 +
+               "/path/to/namespace"
+       assertServiceURI(t, serviceURI, "pulsar", nil,
+               []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", 
"[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace",
+               "pulsaruser")
+}
+
+func TestMultiIpv6UriWithoutPulsarPort(t *testing.T) {
+       serviceURI := 
"pulsar://pulsaruser@[fec0:0:0:ffff::1],[fec0:0:0:ffff::2];[fec0:0:0:ffff::3]/path/to/namespace"
+       assertServiceURI(t, serviceURI, "pulsar", nil,
+               []string{"[fec0:0:0:ffff::1]:6650", "[fec0:0:0:ffff::2]:6650", 
"[fec0:0:0:ffff::3]:6650"}, "/path/to/namespace",
+               "pulsaruser")
+}
+
+func TestMultipleHostsSemiColon(t *testing.T) {
+       serviceURI := 
"pulsar://host1:6650;host2:6650;host3:6650/path/to/namespace"
+       assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", 
"host2:6650", "host3:6650"},
+               "/path/to/namespace", "")
+}
+
+func TestMultipleHostsComma(t *testing.T) {
+       serviceURI := 
"pulsar://host1:6650,host2:6650,host3:6650/path/to/namespace"
+       assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", 
"host2:6650", "host3:6650"},
+               "/path/to/namespace", "")
+}
+
+func TestMultipleHostsWithoutPulsarPorts(t *testing.T) {
+       serviceURI := "pulsar://host1,host2,host3/path/to/namespace"
+       assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", 
"host2:6650", "host3:6650"},
+               "/path/to/namespace", "")
+}
+
+func TestMultipleHostsWithoutPulsarTlsPorts(t *testing.T) {
+       serviceURI := "pulsar+ssl://host1,host2,host3/path/to/namespace"
+       assertServiceURI(t, serviceURI, "pulsar", []string{"ssl"}, 
[]string{"host1:6651", "host2:6651", "host3:6651"},
+               "/path/to/namespace", "")
+}
+
+func TestMultipleHostsWithoutHttpPorts(t *testing.T) {
+       serviceURI := "http://host1,host2,host3/path/to/namespace";
+       assertServiceURI(t, serviceURI, "http", nil, []string{"host1:80", 
"host2:80", "host3:80"}, "/path/to/namespace", "")
+}
+
+func TestMultipleHostsWithoutHttpsPorts(t *testing.T) {
+       serviceURI := "https://host1,host2,host3/path/to/namespace";
+       assertServiceURI(t, serviceURI, "https", nil, []string{"host1:443", 
"host2:443", "host3:443"}, "/path/to/namespace",
+               "")
+}
+
+func TestMultipleHostsMixedPorts(t *testing.T) {
+       serviceURI := 
"pulsar://host1:6640,host2:6650,host3:6660/path/to/namespace"
+       assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6640", 
"host2:6650", "host3:6660"},
+               "/path/to/namespace", "")
+}
+
+func TestMultipleHostsMixed(t *testing.T) {
+       serviceURI := "pulsar://host1:6640,host2,host3:6660/path/to/namespace"
+       assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6640", 
"host2:6650", "host3:6660"},
+               "/path/to/namespace", "")
+}
+
+func TestUserInfoWithMultipleHosts(t *testing.T) {
+       serviceURI := 
"pulsar://pulsaruser@host1:6650;host2:6650;host3:6650/path/to/namespace"
+       assertServiceURI(t, serviceURI, "pulsar", nil, []string{"host1:6650", 
"host2:6650", "host3:6650"},
+               "/path/to/namespace", "pulsaruser")
+}
+
+func testInvalidServiceURI(t *testing.T, serviceURI string) {
+       u, err := NewPulsarServiceURIFromURIString(serviceURI)
+       t.Logf("testInvalidServiceURI %s", serviceURI)
+       assert.Nil(t, u)
+       assert.NotNil(t, err)
+}
+
+func assertServiceURI(t *testing.T, serviceURI, expectedServiceName string,
+       expectedServiceInfo, expectedServiceHosts []string, 
expectedServicePath, expectedServiceUser string) {
+       uri, err := NewPulsarServiceURIFromURIString(serviceURI)
+       assert.Nil(t, err)
+       assert.NotNil(t, serviceURI)
+       assert.Equal(t, expectedServiceName, uri.ServiceName)
+       assert.Equal(t, expectedServicePath, uri.servicePath)
+       if expectedServiceUser != "" {
+               assert.Equal(t, expectedServiceUser, uri.URL.User.Username())
+       }
+       assert.ElementsMatch(t, expectedServiceInfo, uri.ServiceInfos)
+       assert.ElementsMatch(t, expectedServiceHosts, uri.ServiceHosts)
+}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 5708cad..4d62cac 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -621,7 +621,7 @@ func TestProducerMetadata(t *testing.T) {
        }
        producer, err := client.CreateProducer(ProducerOptions{
                Topic:      topic,
-               Name:       "my-producer",
+               Name:       "meta-data-producer",
                Properties: props,
        })
        if err != nil {
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index cd97964..f72ba1d 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -586,3 +586,55 @@ func TestReaderLatestInclusiveHasNext(t *testing.T) {
 
        assert.False(t, reader.HasNext())
 }
+
+func TestReaderWithMultiHosts(t *testing.T) {
+       // Multi hosts included an unreached port and the actual port for 
verify retry logic
+       client, err := NewClient(ClientOptions{
+               URL: "pulsar://localhost:6600,localhost:6650",
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       ctx := context.Background()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: true,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               msgID, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               })
+               assert.NoError(t, err)
+               assert.NotNil(t, msgID)
+       }
+
+       // create reader on 5th message (not included)
+       reader, err := client.CreateReader(ReaderOptions{
+               Topic:          topic,
+               StartMessageID: EarliestMessageID(),
+       })
+
+       assert.Nil(t, err)
+       defer reader.Close()
+
+       i := 0
+       for reader.HasNext() {
+               msg, err := reader.Next(context.Background())
+               assert.NoError(t, err)
+
+               expectMsg := fmt.Sprintf("hello-%d", i)
+               assert.Equal(t, []byte(expectMsg), msg.Payload())
+
+               i++
+       }
+
+       assert.Equal(t, 10, i)
+}

Reply via email to