This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ab75cd  define logger interface and add Logger field to ClientOptions 
(#323)
3ab75cd is described below

commit 3ab75cdefb8758efbced97339621f13c7b4d4b15
Author: Shohi Wang <[email protected]>
AuthorDate: Fri Oct 9 11:08:08 2020 +0800

    define logger interface and add Logger field to ClientOptions (#323)
    
    ### Motivation
    enable users to configure the logger used by the client and use their own 
implementation. If no logger is provided, a wrapped `logrus.StandardLogger()` 
will be used. <s>This PR only solved part of the problem mentioned in the issue 
https://github.com/apache/pulsar-client-go/issues/228.</s>
    
    ### Modifications
    
    * define `Logger` and `Entry` interfaces used by the client
    * add `Logger` field to ClientOptions
    * add `logger` field to internal structures
    * provide a logger implementation backed by logrus
    * implement a no-op logger
---
 pulsar/client.go                       |   7 ++
 pulsar/client_impl.go                  |  21 +++--
 pulsar/consumer_impl.go                |  11 ++-
 pulsar/consumer_multitopic.go          |   6 +-
 pulsar/consumer_partition.go           |  19 ++---
 pulsar/consumer_regex.go               |  28 +++----
 pulsar/consumer_regex_test.go          |   9 ++-
 pulsar/dlq_router.go                   |   9 ++-
 pulsar/internal/batch_builder.go       |  16 ++--
 pulsar/internal/commands.go            |  13 ++-
 pulsar/internal/connection.go          |  39 ++++++---
 pulsar/internal/connection_pool.go     |  22 ++++--
 pulsar/internal/lookup_service.go      |  18 +++--
 pulsar/internal/lookup_service_test.go |  20 ++---
 pulsar/internal/rpc_client.go          |  12 +--
 pulsar/log/log.go                      |  52 ++++++++++++
 pulsar/log/logger.go                   |  67 ++++++++++++++++
 pulsar/log/wrapper_logrus.go           | 140 +++++++++++++++++++++++++++++++++
 pulsar/negative_acks_tracker.go        |  12 +--
 pulsar/negative_acks_tracker_test.go   |   5 +-
 pulsar/producer_impl.go                |   8 +-
 pulsar/producer_partition.go           |  25 +++---
 pulsar/reader_impl.go                  |   9 +--
 pulsar/retry_router.go                 |   9 ++-
 24 files changed, 442 insertions(+), 135 deletions(-)

diff --git a/pulsar/client.go b/pulsar/client.go
index 2253607..ce9db91 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -22,6 +22,7 @@ import (
        "time"
 
        "github.com/apache/pulsar-client-go/pulsar/internal/auth"
+       "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
 func NewClient(options ClientOptions) (Client, error) {
@@ -102,6 +103,12 @@ type ClientOptions struct {
 
        // Max number of connections to a single broker that will kept in the 
pool. (Default: 1 connection)
        MaxConnectionsPerBroker int
+
+       // Configure the logger used by the client.
+       // By default, a wrapped logrus.StandardLogger will be used, namely,
+       // log.NewLoggerWithLogrus(logrus.StandardLogger())
+       // FIXME: use `logger` as internal field name instead of `log` as it's 
more idiomatic
+       Logger log.Logger
 }
 
 type Client interface {
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index e0d6f47..20939c9 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -25,11 +25,12 @@ import (
 
        "github.com/gogo/protobuf/proto"
 
-       log "github.com/sirupsen/logrus"
+       "github.com/sirupsen/logrus"
 
        "github.com/apache/pulsar-client-go/pulsar/internal"
        "github.com/apache/pulsar-client-go/pulsar/internal/auth"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+       "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
 const (
@@ -42,16 +43,25 @@ type client struct {
        rpcClient     internal.RPCClient
        handlers      internal.ClientHandlers
        lookupService internal.LookupService
+
+       log log.Logger
 }
 
 func newClient(options ClientOptions) (Client, error) {
+       var logger log.Logger
+       if options.Logger != nil {
+               logger = options.Logger
+       } else {
+               logger = log.NewLoggerWithLogrus(logrus.StandardLogger())
+       }
+
        if options.URL == "" {
                return nil, newError(ResultInvalidConfiguration, "URL is 
required for client")
        }
 
        url, err := url.Parse(options.URL)
        if err != nil {
-               log.WithError(err).Error("Failed to parse service URL")
+               logger.WithError(err).Error("Failed to parse service URL")
                return nil, newError(ResultInvalidConfiguration, "Invalid 
service URL")
        }
 
@@ -101,10 +111,11 @@ func newClient(options ClientOptions) (Client, error) {
        }
 
        c := &client{
-               cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, 
connectionTimeout, maxConnectionsPerHost),
+               cnxPool: internal.NewConnectionPool(tlsConfig, authProvider, 
connectionTimeout, maxConnectionsPerHost, logger),
+               log:     logger,
        }
-       c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout)
-       c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig 
!= nil)
+       c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout, 
logger)
+       c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig 
!= nil, logger)
        c.handlers = internal.NewClientHandlers()
        return c, nil
 }
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 13339af..5cb1dd2 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -29,10 +29,9 @@ import (
        "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/client_golang/prometheus/promauto"
 
-       log "github.com/sirupsen/logrus"
-
        "github.com/apache/pulsar-client-go/pulsar/internal"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+       "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
 var (
@@ -80,7 +79,7 @@ type consumer struct {
        errorCh   chan error
        ticker    *time.Ticker
 
-       log *log.Entry
+       log log.Logger
 }
 
 func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
@@ -146,11 +145,11 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                }
        }
 
-       dlq, err := newDlqRouter(client, options.DLQ)
+       dlq, err := newDlqRouter(client, options.DLQ, client.log)
        if err != nil {
                return nil, err
        }
-       rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable)
+       rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable, 
client.log)
        if err != nil {
                return nil, err
        }
@@ -206,7 +205,7 @@ func newInternalConsumer(client *client, options 
ConsumerOptions, topic string,
                errorCh:                   make(chan error),
                dlq:                       dlq,
                rlq:                       rlq,
-               log:                       log.WithField("topic", topic),
+               log:                       
client.log.SubLogger(log.Fields{"topic": topic}),
                consumerName:              options.Name,
        }
 
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 27146c4..7705590 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -26,7 +26,7 @@ import (
 
        pkgerrors "github.com/pkg/errors"
 
-       log "github.com/sirupsen/logrus"
+       "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
 type multiTopicConsumer struct {
@@ -42,7 +42,7 @@ type multiTopicConsumer struct {
        closeOnce sync.Once
        closeCh   chan struct{}
 
-       log *log.Entry
+       log log.Logger
 }
 
 func newMultiTopicConsumer(client *client, options ConsumerOptions, topics 
[]string,
@@ -54,7 +54,7 @@ func newMultiTopicConsumer(client *client, options 
ConsumerOptions, topics []str
                closeCh:      make(chan struct{}),
                dlq:          dlq,
                rlq:          rlq,
-               log:          log.WithField("topics", topics),
+               log:          client.log.SubLogger(log.Fields{"topic": topics}),
                consumerName: options.Name,
        }
 
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 882afd7..9f555d2 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -28,11 +28,10 @@ import (
 
        "github.com/gogo/protobuf/proto"
 
-       log "github.com/sirupsen/logrus"
-
        "github.com/apache/pulsar-client-go/pulsar/internal"
        "github.com/apache/pulsar-client-go/pulsar/internal/compression"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+       "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
 var (
@@ -159,7 +158,7 @@ type partitionConsumer struct {
        nackTracker *negativeAcksTracker
        dlq         *dlqRouter
 
-       log *log.Entry
+       log log.Logger
 
        compressionProviders map[pb.CompressionType]compression.Provider
 }
@@ -185,16 +184,18 @@ func newPartitionConsumer(parent Consumer, client 
*client, options *partitionCon
                clearQueueCh:         make(chan func(id trackingMessageID)),
                compressionProviders: 
make(map[pb.CompressionType]compression.Provider),
                dlq:                  dlq,
-               log:                  log.WithField("topic", options.topic),
        }
-       pc.log = pc.log.WithField("name", pc.name).
-               WithField("subscription", options.subscription).
-               WithField("consumerID", pc.consumerID)
-       pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay)
+       pc.log = client.log.SubLogger(log.Fields{
+               "name":         pc.name,
+               "topic":        options.topic,
+               "subscription": options.subscription,
+               "consumerID":   pc.consumerID,
+       })
+       pc.nackTracker = newNegativeAcksTracker(pc, 
options.nackRedeliveryDelay, pc.log)
 
        err := pc.grabConn()
        if err != nil {
-               log.WithError(err).Errorf("Failed to create consumer")
+               pc.log.WithError(err).Error("Failed to create consumer")
                return nil, err
        }
        pc.log.Info("Created consumer")
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 13ef600..d7ca183 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -28,9 +28,8 @@ import (
 
        pkgerrors "github.com/pkg/errors"
 
-       log "github.com/sirupsen/logrus"
-
        "github.com/apache/pulsar-client-go/pulsar/internal"
+       "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
 const (
@@ -59,7 +58,7 @@ type regexConsumer struct {
 
        ticker *time.Ticker
 
-       log *log.Entry
+       log log.Logger
 
        consumerName string
 }
@@ -82,7 +81,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn 
*internal.TopicName, p
 
                closeCh: make(chan struct{}),
 
-               log:          log.WithField("topic", tn.Name),
+               log:          c.log.SubLogger(log.Fields{"topic": tn.Name}),
                consumerName: opts.Name,
        }
 
@@ -280,13 +279,12 @@ func (c *regexConsumer) discover() {
        newTopics := topicsDiff(topics, known)
        staleTopics := topicsDiff(known, topics)
 
-       if log.GetLevel() == log.DebugLevel {
-               l := c.log.WithFields(log.Fields{
+       c.log.
+               WithFields(log.Fields{
                        "new_topics": newTopics,
                        "old_topics": staleTopics,
-               })
-               l.Debug("discover topics")
-       }
+               }).
+               Debug("discover topics")
 
        c.unsubscribeCh <- staleTopics
        c.subscribeCh <- newTopics
@@ -306,9 +304,7 @@ func (c *regexConsumer) knownTopics() []string {
 }
 
 func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq 
*retryRouter) {
-       if log.GetLevel() == log.DebugLevel {
-               c.log.WithField("topics", topics).Debug("subscribe")
-       }
+       c.log.WithField("topics", topics).Debug("subscribe")
        consumers := make(map[string]Consumer, len(topics))
        for ce := range subscriber(c.client, topics, c.options, c.messageCh, 
dlq, rlq) {
                if ce.err != nil {
@@ -326,11 +322,11 @@ func (c *regexConsumer) subscribe(topics []string, dlq 
*dlqRouter, rlq *retryRou
 }
 
 func (c *regexConsumer) unsubscribe(topics []string) {
-       if log.GetLevel() == log.DebugLevel {
-               c.log.WithField("topics", topics).Debug("unsubscribe")
-       }
+       c.log.WithField("topics", topics).Debug("unsubscribe")
+
        consumers := make(map[string]Consumer, len(topics))
        c.consumersLock.Lock()
+
        for _, t := range topics {
                if consumer, ok := c.consumers[t]; ok {
                        consumers[t] = consumer
@@ -340,7 +336,7 @@ func (c *regexConsumer) unsubscribe(topics []string) {
        c.consumersLock.Unlock()
 
        for t, consumer := range consumers {
-               log.Debugf("unsubscribe from topic=%s subscription=%s", t, 
c.options.SubscriptionName)
+               c.log.Debugf("unsubscribe from topic=%s subscription=%s", t, 
c.options.SubscriptionName)
                if err := consumer.Unsubscribe(); err != nil {
                        c.log.Warnf("unable to unsubscribe from topic=%s 
subscription=%s",
                                t, c.options.SubscriptionName)
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index e4acf5f..ed27373 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -28,6 +28,7 @@ import (
        "github.com/stretchr/testify/assert"
 
        "github.com/apache/pulsar-client-go/pulsar/internal"
+       "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
 func TestFilterTopics(t *testing.T) {
@@ -153,8 +154,8 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c 
Client, namespace string
                AutoDiscoveryPeriod: 5 * time.Minute,
        }
 
-       dlq, _ := newDlqRouter(c.(*client), nil)
-       rlq, _ := newRetryRouter(c.(*client), nil, false)
+       dlq, _ := newDlqRouter(c.(*client), nil, log.DefaultNopLogger())
+       rlq, _ := newRetryRouter(c.(*client), nil, false, 
log.DefaultNopLogger())
        consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, 
make(chan ConsumerMessage, 1), dlq, rlq)
        if err != nil {
                t.Fatal(err)
@@ -202,8 +203,8 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c 
Client, namespace string
                AutoDiscoveryPeriod: 5 * time.Minute,
        }
 
-       dlq, _ := newDlqRouter(c.(*client), nil)
-       rlq, _ := newRetryRouter(c.(*client), nil, false)
+       dlq, _ := newDlqRouter(c.(*client), nil, log.DefaultNopLogger())
+       rlq, _ := newRetryRouter(c.(*client), nil, false, 
log.DefaultNopLogger())
        consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, 
make(chan ConsumerMessage, 1), dlq, rlq)
        if err != nil {
                t.Fatal(err)
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 6aa7e2b..b4d98b0 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -23,7 +23,7 @@ import (
        "time"
 
        "github.com/apache/pulsar-client-go/pulsar/internal"
-       log "github.com/sirupsen/logrus"
+       "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
 type dlqRouter struct {
@@ -32,13 +32,14 @@ type dlqRouter struct {
        policy    *DLQPolicy
        messageCh chan ConsumerMessage
        closeCh   chan interface{}
-       log       *log.Entry
+       log       log.Logger
 }
 
-func newDlqRouter(client Client, policy *DLQPolicy) (*dlqRouter, error) {
+func newDlqRouter(client Client, policy *DLQPolicy, logger log.Logger) 
(*dlqRouter, error) {
        r := &dlqRouter{
                client: client,
                policy: policy,
+               log:    logger,
        }
 
        if policy != nil {
@@ -52,7 +53,7 @@ func newDlqRouter(client Client, policy *DLQPolicy) 
(*dlqRouter, error) {
 
                r.messageCh = make(chan ConsumerMessage)
                r.closeCh = make(chan interface{}, 1)
-               r.log = log.WithField("dlq-topic", policy.DeadLetterTopic)
+               r.log = logger.SubLogger(log.Fields{"dlq-topic": 
policy.DeadLetterTopic})
                go r.run()
        }
        return r, nil
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index fe34f0c..81dda2d 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -20,11 +20,11 @@ package internal
 import (
        "time"
 
-       "github.com/apache/pulsar-client-go/pulsar/internal/compression"
-       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/gogo/protobuf/proto"
 
-       log "github.com/sirupsen/logrus"
+       "github.com/apache/pulsar-client-go/pulsar/internal/compression"
+       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+       "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
 const (
@@ -63,12 +63,14 @@ type BatchBuilder struct {
 
        compressionProvider compression.Provider
        buffersPool         BuffersPool
+
+       log log.Logger
 }
 
 // NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a 
new batch message container.
 func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, 
producerID uint64,
        compressionType pb.CompressionType, level compression.Level,
-       bufferPool BuffersPool) (*BatchBuilder, error) {
+       bufferPool BuffersPool, logger log.Logger) (*BatchBuilder, error) {
        if maxMessages == 0 {
                maxMessages = DefaultMaxMessagesPerBatch
        }
@@ -92,6 +94,7 @@ func NewBatchBuilder(maxMessages uint, maxBatchSize uint, 
producerName string, p
                callbacks:           []interface{}{},
                compressionProvider: getCompressionProvider(compressionType, 
level),
                buffersPool:         bufferPool,
+               log:                 logger,
        }
 
        if compressionType != pb.CompressionType_NONE {
@@ -162,7 +165,7 @@ func (bb *BatchBuilder) Flush() (batchData Buffer, 
sequenceID uint64, callbacks
                // No-Op for empty batch
                return nil, 0, nil
        }
-       log.Debug("BatchBuilder flush: messages: ", bb.numMessages)
+       bb.log.Debug("BatchBuilder flush: messages: ", bb.numMessages)
 
        bb.msgMetadata.NumMessagesInBatch = proto.Int32(int32(bb.numMessages))
        bb.cmdSend.Send.NumMessages = proto.Int32(int32(bb.numMessages))
@@ -198,7 +201,6 @@ func getCompressionProvider(compressionType 
pb.CompressionType,
        case pb.CompressionType_ZSTD:
                return compression.NewZStdProvider(level)
        default:
-               log.Panic("unsupported compression type")
-               return nil
+               panic("unsupported compression type")
        }
 }
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index c32262e..2536bae 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -21,12 +21,9 @@ import (
        "errors"
        "fmt"
 
-       "github.com/apache/pulsar-client-go/pulsar/internal/compression"
-
        "github.com/gogo/protobuf/proto"
 
-       log "github.com/sirupsen/logrus"
-
+       "github.com/apache/pulsar-client-go/pulsar/internal/compression"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
 )
 
@@ -196,7 +193,7 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg 
proto.Message) *pb.BaseCommand
        case pb.BaseCommand_AUTH_RESPONSE:
                cmd.AuthResponse = msg.(*pb.CommandAuthResponse)
        default:
-               log.Panic("Missing command type: ", cmdType)
+               panic(fmt.Sprintf("Missing command type: %v", cmdType))
        }
 
        return cmd
@@ -209,7 +206,7 @@ func addSingleMessageToBatch(wb Buffer, smm 
*pb.SingleMessageMetadata, payload [
        wb.ResizeIfNeeded(metadataSize)
        _, err := smm.MarshalToSizedBuffer(wb.WritableSlice()[:metadataSize])
        if err != nil {
-               log.WithError(err).Fatal("Protobuf serialization error")
+               panic(fmt.Sprintf("Protobuf serialization error: %v", err))
        }
 
        wb.WrittenBytes(metadataSize)
@@ -235,7 +232,7 @@ func serializeBatch(wb Buffer,
        wb.ResizeIfNeeded(cmdSize)
        _, err := cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize])
        if err != nil {
-               log.WithError(err).Fatal("Protobuf error when serializing 
cmdSend")
+               panic(fmt.Sprintf("Protobuf error when serializing cmdSend: 
%v", err))
        }
        wb.WrittenBytes(cmdSize)
 
@@ -250,7 +247,7 @@ func serializeBatch(wb Buffer,
        wb.ResizeIfNeeded(msgMetadataSize)
        _, err = 
msgMetadata.MarshalToSizedBuffer(wb.WritableSlice()[:msgMetadataSize])
        if err != nil {
-               log.WithError(err).Fatal("Protobuf error when serializing 
msgMetadata")
+               panic(fmt.Sprintf("Protobuf error when serializing msgMetadata: 
%v", err))
        }
        wb.WrittenBytes(msgMetadataSize)
 
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 1905d17..aff263b 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -33,10 +33,10 @@ import (
        "github.com/prometheus/client_golang/prometheus/promauto"
 
        "github.com/gogo/protobuf/proto"
-       log "github.com/sirupsen/logrus"
 
        "github.com/apache/pulsar-client-go/pulsar/internal/auth"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+       "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
 const (
@@ -167,7 +167,7 @@ type connection struct {
        pingTicker           *time.Ticker
        pingCheckTicker      *time.Ticker
 
-       log *log.Entry
+       log log.Logger
 
        requestIDGenerator uint64
 
@@ -189,21 +189,30 @@ type connection struct {
        maxMessageSize int32
 }
 
-func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions 
*TLSOptions,
-       connectionTimeout time.Duration, auth auth.Provider) *connection {
+// connectionOptions defines configurations for creating connection.
+type connectionOptions struct {
+       logicalAddr       *url.URL
+       physicalAddr      *url.URL
+       tls               *TLSOptions
+       connectionTimeout time.Duration
+       auth              auth.Provider
+       logger            log.Logger
+}
+
+func newConnection(opts connectionOptions) *connection {
        cnx := &connection{
                state:                int32(connectionInit),
-               connectionTimeout:    connectionTimeout,
-               logicalAddr:          logicalAddr,
-               physicalAddr:         physicalAddr,
+               connectionTimeout:    opts.connectionTimeout,
+               logicalAddr:          opts.logicalAddr,
+               physicalAddr:         opts.physicalAddr,
                writeBuffer:          NewBuffer(4096),
-               log:                  log.WithField("remote_addr", 
physicalAddr),
+               log:                  
opts.logger.SubLogger(log.Fields{"remote_addr": opts.physicalAddr}),
                pendingReqs:          make(map[uint64]*request),
                lastDataReceivedTime: time.Now(),
                pingTicker:           time.NewTicker(keepAliveInterval),
                pingCheckTicker:      time.NewTicker(keepAliveInterval),
-               tlsOptions:           tlsOptions,
-               auth:                 auth,
+               tlsOptions:           opts.tls,
+               auth:                 opts.auth,
 
                closeCh:            make(chan interface{}),
                incomingRequestsCh: make(chan *request, 10),
@@ -273,7 +282,7 @@ func (c *connection) connect() bool {
 
        c.Lock()
        c.cnx = cnx
-       c.log = c.log.WithField("local_addr", c.cnx.LocalAddr())
+       c.log = c.log.SubLogger(log.Fields{"local_addr": c.cnx.LocalAddr()})
        c.log.Info("TCP connection established")
        c.Unlock()
 
@@ -473,7 +482,8 @@ func (c *connection) writeCommand(cmd *pb.BaseCommand) {
        c.writeBuffer.ResizeIfNeeded(cmdSize)
        _, err := 
cmd.MarshalToSizedBuffer(c.writeBuffer.WritableSlice()[:cmdSize])
        if err != nil {
-               c.log.WithError(err).Fatal("Protobuf serialization error")
+               c.log.WithError(err).Error("Protobuf serialization error")
+               panic("Protobuf serialization error")
        }
 
        c.writeBuffer.WrittenBytes(cmdSize)
@@ -628,7 +638,10 @@ func (c *connection) handleMessage(response 
*pb.CommandMessage, payload Buffer)
        if consumer, ok := c.consumerHandler(consumerID); ok {
                err := consumer.MessageReceived(response, payload)
                if err != nil {
-                       c.log.WithField("consumerID", 
consumerID).WithError(err).Error("handle message Id: ", response.MessageId)
+                       c.log.
+                               WithError(err).
+                               WithField("consumerID", consumerID).
+                               Error("handle message Id: ", response.MessageId)
                }
        } else {
                c.log.WithField("consumerID", consumerID).Warn("Got unexpected 
message: ", response.MessageId)
diff --git a/pulsar/internal/connection_pool.go 
b/pulsar/internal/connection_pool.go
index a90ac42..133b280 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -25,8 +25,7 @@ import (
        "time"
 
        "github.com/apache/pulsar-client-go/pulsar/internal/auth"
-
-       log "github.com/sirupsen/logrus"
+       "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
 // ConnectionPool is a interface of connection pool.
@@ -45,6 +44,8 @@ type connectionPool struct {
        auth                  auth.Provider
        maxConnectionsPerHost int32
        roundRobinCnt         int32
+
+       log log.Logger
 }
 
 // NewConnectionPool init connection pool.
@@ -52,12 +53,14 @@ func NewConnectionPool(
        tlsOptions *TLSOptions,
        auth auth.Provider,
        connectionTimeout time.Duration,
-       maxConnectionsPerHost int) ConnectionPool {
+       maxConnectionsPerHost int,
+       logger log.Logger) ConnectionPool {
        return &connectionPool{
                tlsOptions:            tlsOptions,
                auth:                  auth,
                connectionTimeout:     connectionTimeout,
                maxConnectionsPerHost: int32(maxConnectionsPerHost),
+               log:                   logger,
        }
 }
 
@@ -66,7 +69,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL, 
physicalAddr *url.U
        cachedCnx, found := p.pool.Load(key)
        if found {
                cnx := cachedCnx.(*connection)
-               log.Debug("Found connection in cache:", cnx.logicalAddr, 
cnx.physicalAddr)
+               p.log.Debug("Found connection in cache:", cnx.logicalAddr, 
cnx.physicalAddr)
 
                if err := cnx.waitUntilReady(); err == nil {
                        // Connection is ready to be used
@@ -74,11 +77,18 @@ func (p *connectionPool) GetConnection(logicalAddr 
*url.URL, physicalAddr *url.U
                }
                // The cached connection is failed
                p.pool.Delete(key)
-               log.Debug("Removed failed connection from pool:", 
cnx.logicalAddr, cnx.physicalAddr)
+               p.log.Debug("Removed failed connection from pool:", 
cnx.logicalAddr, cnx.physicalAddr)
        }
 
        // Try to create a new connection
-       newConnection := newConnection(logicalAddr, physicalAddr, p.tlsOptions, 
p.connectionTimeout, p.auth)
+       newConnection := newConnection(connectionOptions{
+               logicalAddr:       logicalAddr,
+               physicalAddr:      physicalAddr,
+               tls:               p.tlsOptions,
+               connectionTimeout: p.connectionTimeout,
+               auth:              p.auth,
+               logger:            p.log,
+       })
        newCnx, wasCached := p.pool.LoadOrStore(key, newConnection)
        cnx := newCnx.(*connection)
 
diff --git a/pulsar/internal/lookup_service.go 
b/pulsar/internal/lookup_service.go
index 1673f4d..00f54ef 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -22,13 +22,12 @@ import (
        "fmt"
        "net/url"
 
+       "github.com/gogo/protobuf/proto"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/client_golang/prometheus/promauto"
 
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
-       "github.com/gogo/protobuf/proto"
-
-       log "github.com/sirupsen/logrus"
+       "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
 var (
@@ -55,14 +54,17 @@ type lookupService struct {
        rpcClient  RPCClient
        serviceURL *url.URL
        tlsEnabled bool
+       log        log.Logger
 }
 
 // NewLookupService init a lookup service struct and return an object of 
LookupService.
-func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, tlsEnabled 
bool) LookupService {
+func NewLookupService(rpcClient RPCClient, serviceURL *url.URL,
+       tlsEnabled bool, logger log.Logger) LookupService {
        return &lookupService{
                rpcClient:  rpcClient,
                serviceURL: serviceURL,
                tlsEnabled: tlsEnabled,
+               log:        logger.SubLogger(log.Fields{"serviceURL": 
serviceURL}),
        }
 }
 
@@ -102,7 +104,7 @@ func (ls *lookupService) Lookup(topic string) 
(*LookupResult, error) {
        if err != nil {
                return nil, err
        }
-       log.Debugf("Got topic{%s} lookup response: %+v", topic, res)
+       ls.log.Debugf("Got topic{%s} lookup response: %+v", topic, res)
 
        for i := 0; i < lookupResultMaxRedirect; i++ {
                lr := res.Response.LookupTopicResponse
@@ -114,7 +116,7 @@ func (ls *lookupService) Lookup(topic string) 
(*LookupResult, error) {
                                return nil, err
                        }
 
-                       log.Debugf("Follow topic{%s} redirect to broker. %v / 
%v - Use proxy: %v",
+                       ls.log.Debugf("Follow topic{%s} redirect to broker. %v 
/ %v - Use proxy: %v",
                                topic, lr.BrokerServiceUrl, 
lr.BrokerServiceUrlTls, lr.ProxyThroughServiceUrl)
 
                        id := ls.rpcClient.NewRequestID()
@@ -131,7 +133,7 @@ func (ls *lookupService) Lookup(topic string) 
(*LookupResult, error) {
                        continue
 
                case pb.CommandLookupTopicResponse_Connect:
-                       log.Debugf("Successfully looked up topic{%s} on broker. 
%s / %s - Use proxy: %t",
+                       ls.log.Debugf("Successfully looked up topic{%s} on 
broker. %s / %s - Use proxy: %t",
                                topic, lr.GetBrokerServiceUrl(), 
lr.GetBrokerServiceUrlTls(), lr.GetProxyThroughServiceUrl())
 
                        logicalAddress, physicalAddress, err := 
ls.getBrokerAddress(lr)
@@ -149,7 +151,7 @@ func (ls *lookupService) Lookup(topic string) 
(*LookupResult, error) {
                        if lr.Error != nil {
                                errorMsg = lr.Error.String()
                        }
-                       log.Warnf("Failed to lookup topic: %s, error msg: %s", 
topic, errorMsg)
+                       ls.log.Warnf("Failed to lookup topic: %s, error msg: 
%s", topic, errorMsg)
                        return nil, fmt.Errorf("failed to lookup topic: %s", 
errorMsg)
                }
        }
diff --git a/pulsar/internal/lookup_service_test.go 
b/pulsar/internal/lookup_service_test.go
index 1eead50..fa22f18 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -21,9 +21,11 @@ import (
        "net/url"
        "testing"
 
-       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/gogo/protobuf/proto"
        "github.com/stretchr/testify/assert"
+
+       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+       "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
 type mockedRPCClient struct {
@@ -127,7 +129,7 @@ func TestLookupSuccess(t *testing.T) {
                                BrokerServiceUrl: 
proto.String("pulsar://broker-1:6650"),
                        },
                },
-       }, url, false)
+       }, url, false, log.DefaultNopLogger())
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -159,7 +161,7 @@ func TestTlsLookupSuccess(t *testing.T) {
                                BrokerServiceUrlTls: 
proto.String("pulsar+ssl://broker-1:6651"),
                        },
                },
-       }, url, true)
+       }, url, true, log.DefaultNopLogger())
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -192,7 +194,7 @@ func TestLookupWithProxy(t *testing.T) {
                                ProxyThroughServiceUrl: proto.Bool(true),
                        },
                },
-       }, url, false)
+       }, url, false, log.DefaultNopLogger())
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -225,7 +227,7 @@ func TestTlsLookupWithProxy(t *testing.T) {
                                ProxyThroughServiceUrl: proto.Bool(true),
                        },
                },
-       }, url, true)
+       }, url, true, log.DefaultNopLogger())
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -269,7 +271,7 @@ func TestLookupWithRedirect(t *testing.T) {
                                BrokerServiceUrl: 
proto.String("pulsar://broker-1:6650"),
                        },
                },
-       }, url, false)
+       }, url, false, log.DefaultNopLogger())
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -313,7 +315,7 @@ func TestTlsLookupWithRedirect(t *testing.T) {
                                BrokerServiceUrlTls: 
proto.String("pulsar+ssl://broker-1:6651"),
                        },
                },
-       }, url, true)
+       }, url, true, log.DefaultNopLogger())
 
        lr, err := ls.Lookup("my-topic")
        assert.NoError(t, err)
@@ -346,7 +348,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
                                ProxyThroughServiceUrl: proto.Bool(false),
                        },
                },
-       }, url, false)
+       }, url, false, log.DefaultNopLogger())
 
        lr, err := ls.Lookup("my-topic")
        assert.Error(t, err)
@@ -374,7 +376,7 @@ func TestLookupWithLookupFailure(t *testing.T) {
                                Authoritative: proto.Bool(true),
                        },
                },
-       }, url, false)
+       }, url, false, log.DefaultNopLogger())
 
        lr, err := ls.Lookup("my-topic")
        assert.Error(t, err)
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 0d4cc12..c7d810a 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -27,10 +27,10 @@ import (
        "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/client_golang/prometheus/promauto"
 
+       "github.com/apache/pulsar-client-go/pulsar/log"
+
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/gogo/protobuf/proto"
-
-       log "github.com/sirupsen/logrus"
 )
 
 var (
@@ -71,16 +71,16 @@ type rpcClient struct {
        requestIDGenerator  uint64
        producerIDGenerator uint64
        consumerIDGenerator uint64
-
-       log *log.Entry
+       log                 log.Logger
 }
 
-func NewRPCClient(serviceURL *url.URL, pool ConnectionPool, requestTimeout 
time.Duration) RPCClient {
+func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
+       requestTimeout time.Duration, logger log.Logger) RPCClient {
        return &rpcClient{
                serviceURL:     serviceURL,
                pool:           pool,
                requestTimeout: requestTimeout,
-               log:            log.WithField("serviceURL", serviceURL),
+               log:            logger.SubLogger(log.Fields{"serviceURL": 
serviceURL}),
        }
 }
 
diff --git a/pulsar/log/log.go b/pulsar/log/log.go
new file mode 100644
index 0000000..7ed5231
--- /dev/null
+++ b/pulsar/log/log.go
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package log
+
+// DefaultNopLogger returns a nop logger.
+func DefaultNopLogger() Logger {
+       return nopLogger{}
+}
+
+type nopLogger struct{}
+
+func (l nopLogger) SubLogger(fields Fields) Logger                 { return l }
+func (l nopLogger) WithFields(fields Fields) Entry                 { return 
nopEntry{} }
+func (l nopLogger) WithField(name string, value interface{}) Entry { return 
nopEntry{} }
+func (l nopLogger) WithError(err error) Entry                      { return 
nopEntry{} }
+func (l nopLogger) Debug(args ...interface{})                      {}
+func (l nopLogger) Info(args ...interface{})                       {}
+func (l nopLogger) Warn(args ...interface{})                       {}
+func (l nopLogger) Error(args ...interface{})                      {}
+func (l nopLogger) Debugf(format string, args ...interface{})      {}
+func (l nopLogger) Infof(format string, args ...interface{})       {}
+func (l nopLogger) Warnf(format string, args ...interface{})       {}
+func (l nopLogger) Errorf(format string, args ...interface{})      {}
+
+type nopEntry struct{}
+
+func (e nopEntry) WithFields(fields Fields) Entry                 { return 
nopEntry{} }
+func (e nopEntry) WithField(name string, value interface{}) Entry { return 
nopEntry{} }
+
+func (e nopEntry) Debug(args ...interface{})                 {}
+func (e nopEntry) Info(args ...interface{})                  {}
+func (e nopEntry) Warn(args ...interface{})                  {}
+func (e nopEntry) Error(args ...interface{})                 {}
+func (e nopEntry) Debugf(format string, args ...interface{}) {}
+func (e nopEntry) Infof(format string, args ...interface{})  {}
+func (e nopEntry) Warnf(format string, args ...interface{})  {}
+func (e nopEntry) Errorf(format string, args ...interface{}) {}
diff --git a/pulsar/log/logger.go b/pulsar/log/logger.go
new file mode 100644
index 0000000..693416c
--- /dev/null
+++ b/pulsar/log/logger.go
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package log defines the logger interfaces used by pulsar client.
+// Users can leverage these interfaces to provide a customized logger
+// implementation.
+//
+// The Logger and Entry interfaces defined here are inspired
+// by sirupsen/logrus, both logrus and zap logging libraries
+// are good resources to learn how to implement a effective
+// logging library.
+//
+// Besides the interfaces, this log library also provides an
+// implementation based on logrus, and a No-op one as well.
+package log
+
+// Fields type, used to pass to `WithFields`.
+type Fields map[string]interface{}
+
+// Logger describes the interface that must be implemeted by all loggers.
+type Logger interface {
+       SubLogger(fields Fields) Logger
+
+       WithFields(fields Fields) Entry
+       WithField(name string, value interface{}) Entry
+       WithError(err error) Entry
+
+       Debug(args ...interface{})
+       Info(args ...interface{})
+       Warn(args ...interface{})
+       Error(args ...interface{})
+
+       Debugf(format string, args ...interface{})
+       Infof(format string, args ...interface{})
+       Warnf(format string, args ...interface{})
+       Errorf(format string, args ...interface{})
+}
+
+// Entry describes the interface for the logger entry.
+type Entry interface {
+       WithFields(fields Fields) Entry
+       WithField(name string, value interface{}) Entry
+
+       Debug(args ...interface{})
+       Info(args ...interface{})
+       Warn(args ...interface{})
+       Error(args ...interface{})
+
+       Debugf(format string, args ...interface{})
+       Infof(format string, args ...interface{})
+       Warnf(format string, args ...interface{})
+       Errorf(format string, args ...interface{})
+}
diff --git a/pulsar/log/wrapper_logrus.go b/pulsar/log/wrapper_logrus.go
new file mode 100644
index 0000000..efcf1e9
--- /dev/null
+++ b/pulsar/log/wrapper_logrus.go
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package log
+
+import (
+       "github.com/sirupsen/logrus"
+)
+
+// logrusWrapper implements Logger interface
+// based on underlying logrus.FieldLogger
+type logrusWrapper struct {
+       l logrus.FieldLogger
+}
+
+// NewLoggerWithLogrus creates a new logger which wraps
+// the given logrus.Logger
+func NewLoggerWithLogrus(logger *logrus.Logger) Logger {
+       return &logrusWrapper{
+               l: logger,
+       }
+}
+
+func (l *logrusWrapper) SubLogger(fs Fields) Logger {
+       return &logrusWrapper{
+               l: l.l.WithFields(logrus.Fields(fs)),
+       }
+}
+
+func (l *logrusWrapper) WithFields(fs Fields) Entry {
+       return logrusEntry{
+               e: l.l.WithFields(logrus.Fields(fs)),
+       }
+}
+
+func (l *logrusWrapper) WithField(name string, value interface{}) Entry {
+       return logrusEntry{
+               e: l.l.WithField(name, value),
+       }
+}
+
+func (l *logrusWrapper) WithError(err error) Entry {
+       return logrusEntry{
+               e: l.l.WithError(err),
+       }
+}
+
+func (l *logrusWrapper) Debug(args ...interface{}) {
+       l.l.Debug(args)
+}
+
+func (l *logrusWrapper) Info(args ...interface{}) {
+       l.l.Info(args)
+}
+
+func (l *logrusWrapper) Warn(args ...interface{}) {
+       l.l.Warn(args)
+}
+
+func (l *logrusWrapper) Error(args ...interface{}) {
+       l.l.Error(args)
+}
+
+func (l *logrusWrapper) Debugf(format string, args ...interface{}) {
+       l.l.Debugf(format, args)
+}
+
+func (l *logrusWrapper) Infof(format string, args ...interface{}) {
+       l.l.Infof(format, args)
+}
+
+func (l *logrusWrapper) Warnf(format string, args ...interface{}) {
+       l.l.Warnf(format, args)
+}
+
+func (l *logrusWrapper) Errorf(format string, args ...interface{}) {
+       l.l.Errorf(format, args)
+}
+
+type logrusEntry struct {
+       e logrus.FieldLogger
+}
+
+func (l logrusEntry) WithFields(fs Fields) Entry {
+       return logrusEntry{
+               e: l.e.WithFields(logrus.Fields(fs)),
+       }
+}
+
+func (l logrusEntry) WithField(name string, value interface{}) Entry {
+       return logrusEntry{
+               e: l.e.WithField(name, value),
+       }
+}
+
+func (l logrusEntry) Debug(args ...interface{}) {
+       l.e.Debug(args)
+}
+
+func (l logrusEntry) Info(args ...interface{}) {
+       l.e.Info(args)
+}
+
+func (l logrusEntry) Warn(args ...interface{}) {
+       l.e.Warn(args)
+}
+
+func (l logrusEntry) Error(args ...interface{}) {
+       l.e.Error(args)
+}
+
+func (l logrusEntry) Debugf(format string, args ...interface{}) {
+       l.e.Debugf(format, args)
+}
+
+func (l logrusEntry) Infof(format string, args ...interface{}) {
+       l.e.Infof(format, args)
+}
+
+func (l logrusEntry) Warnf(format string, args ...interface{}) {
+       l.e.Warnf(format, args)
+}
+
+func (l logrusEntry) Errorf(format string, args ...interface{}) {
+       l.e.Errorf(format, args)
+}
diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go
index a7dc88e..23a8a91 100644
--- a/pulsar/negative_acks_tracker.go
+++ b/pulsar/negative_acks_tracker.go
@@ -21,7 +21,7 @@ import (
        "sync"
        "time"
 
-       log "github.com/sirupsen/logrus"
+       log "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
 type redeliveryConsumer interface {
@@ -36,15 +36,17 @@ type negativeAcksTracker struct {
        rc           redeliveryConsumer
        tick         *time.Ticker
        delay        time.Duration
+       log          log.Logger
 }
 
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration) 
*negativeAcksTracker {
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger 
log.Logger) *negativeAcksTracker {
        t := &negativeAcksTracker{
                doneCh:       make(chan interface{}),
                negativeAcks: make(map[messageID]time.Time),
                rc:           rc,
                tick:         time.NewTicker(delay / 3),
                delay:        delay,
+               log:          logger,
        }
 
        go t.track()
@@ -77,7 +79,7 @@ func (t *negativeAcksTracker) track() {
        for {
                select {
                case <-t.doneCh:
-                       log.Debug("Closing nack tracker")
+                       t.log.Debug("Closing nack tracker")
                        return
 
                case <-t.tick.C:
@@ -87,9 +89,9 @@ func (t *negativeAcksTracker) track() {
                                now := time.Now()
                                msgIds := make([]messageID, 0)
                                for msgID, targetTime := range t.negativeAcks {
-                                       log.Debugf("MsgId: %v -- targetTime: %v 
-- now: %v", msgID, targetTime, now)
+                                       t.log.Debugf("MsgId: %v -- targetTime: 
%v -- now: %v", msgID, targetTime, now)
                                        if targetTime.Before(now) {
-                                               log.Debugf("Adding MsgId: %v", 
msgID)
+                                               t.log.Debugf("Adding MsgId: 
%v", msgID)
                                                msgIds = append(msgIds, msgID)
                                                delete(t.negativeAcks, msgID)
                                        }
diff --git a/pulsar/negative_acks_tracker_test.go 
b/pulsar/negative_acks_tracker_test.go
index 3930b7f..27e07d8 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -23,6 +23,7 @@ import (
        "testing"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/log"
        "github.com/stretchr/testify/assert"
 )
 
@@ -74,7 +75,7 @@ func (nmc *nackMockedConsumer) Wait() <-chan messageID {
 
 func TestNacksTracker(t *testing.T) {
        nmc := newNackMockedConsumer()
-       nacks := newNegativeAcksTracker(nmc, testNackDelay)
+       nacks := newNegativeAcksTracker(nmc, testNackDelay, 
log.DefaultNopLogger())
 
        nacks.Add(messageID{
                ledgerID: 1,
@@ -105,7 +106,7 @@ func TestNacksTracker(t *testing.T) {
 
 func TestNacksWithBatchesTracker(t *testing.T) {
        nmc := newNackMockedConsumer()
-       nacks := newNegativeAcksTracker(nmc, testNackDelay)
+       nacks := newNegativeAcksTracker(nmc, testNackDelay, 
log.DefaultNopLogger())
 
        nacks.Add(messageID{
                ledgerID: 1,
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 458d2b5..25499ce 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -27,9 +27,8 @@ import (
        "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/client_golang/prometheus/promauto"
 
-       log "github.com/sirupsen/logrus"
-
        "github.com/apache/pulsar-client-go/pulsar/internal"
+       "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
 var (
@@ -60,8 +59,7 @@ type producer struct {
        messageRouter func(*ProducerMessage, TopicMetadata) int
        ticker        *time.Ticker
        tickerStop    chan struct{}
-
-       log *log.Entry
+       log           log.Logger
 }
 
 const defaultBatchingMaxPublishDelay = 10 * time.Millisecond
@@ -88,7 +86,7 @@ func newProducer(client *client, options *ProducerOptions) 
(*producer, error) {
                options: options,
                topic:   options.Topic,
                client:  client,
-               log:     log.WithField("topic", options.Topic),
+               log:     client.log.SubLogger(log.Fields{"topic": 
options.Topic}),
        }
 
        var batchingMaxPublishDelay time.Duration
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 870da2e..e8cf0f7 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -31,10 +31,9 @@ import (
 
        "github.com/gogo/protobuf/proto"
 
-       log "github.com/sirupsen/logrus"
-
        "github.com/apache/pulsar-client-go/pulsar/internal"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+       "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
 const (
@@ -89,7 +88,7 @@ type partitionProducer struct {
        state  int32
        client *client
        topic  string
-       log    *log.Entry
+       log    log.Logger
        cnx    internal.Connection
 
        options             *ProducerOptions
@@ -125,11 +124,12 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
                maxPendingMessages = options.MaxPendingMessages
        }
 
+       logger := client.log.SubLogger(log.Fields{"topic": topic})
        p := &partitionProducer{
                state:            producerInit,
-               log:              log.WithField("topic", topic),
                client:           client,
                topic:            topic,
+               log:              logger,
                options:          options,
                producerID:       client.rpcClient.NewProducerID(),
                eventsChan:       make(chan interface{}, maxPendingMessages),
@@ -146,12 +146,15 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
 
        err := p.grabCnx()
        if err != nil {
-               log.WithError(err).Errorf("Failed to create producer")
+               logger.WithError(err).Error("Failed to create producer")
                return nil, err
        }
 
-       p.log = p.log.WithField("producer_name", p.producerName).
-               WithField("producerID", p.producerID)
+       p.log = p.log.SubLogger(log.Fields{
+               "producer_name": p.producerName,
+               "producerID":    p.producerID,
+       })
+
        p.log.WithField("cnx", p.cnx.ID()).Info("Created producer")
        atomic.StoreInt32(&p.state, producerReady)
 
@@ -195,7 +198,8 @@ func (p *partitionProducer) grabCnx() error {
                p.batchBuilder, err = 
internal.NewBatchBuilder(p.options.BatchingMaxMessages, 
p.options.BatchingMaxSize,
                        p.producerName, p.producerID, 
pb.CompressionType(p.options.CompressionType),
                        compression.Level(p.options.CompressionLevel),
-                       p)
+                       p,
+                       p.log)
                if err != nil {
                        return err
                }
@@ -295,9 +299,10 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
        if len(msg.Payload) > int(p.cnx.GetMaxMessageSize()) {
                p.publishSemaphore.Release()
                request.callback(nil, request.msg, errMessageTooLarge)
-               p.log.WithField("size", len(msg.Payload)).
+               p.log.WithError(errMessageTooLarge).
+                       WithField("size", len(msg.Payload)).
                        WithField("properties", msg.Properties).
-                       WithError(errMessageTooLarge).Error()
+                       Error()
                publishErrors.Inc()
                return
        }
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 17a7084..89e9063 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -26,7 +26,7 @@ import (
        "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/client_golang/prometheus/promauto"
 
-       log "github.com/sirupsen/logrus"
+       "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
 const (
@@ -50,8 +50,7 @@ type reader struct {
        pc                  *partitionConsumer
        messageCh           chan ConsumerMessage
        lastMessageInBroker trackingMessageID
-
-       log *log.Entry
+       log                 log.Logger
 }
 
 func newReader(client *client, options ReaderOptions) (Reader, error) {
@@ -106,11 +105,11 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
 
        reader := &reader{
                messageCh: make(chan ConsumerMessage),
-               log:       log.WithField("topic", options.Topic),
+               log:       client.log.SubLogger(log.Fields{"topic": 
options.Topic}),
        }
 
        // Provide dummy dlq router with not dlq policy
-       dlq, err := newDlqRouter(client, nil)
+       dlq, err := newDlqRouter(client, nil, client.log)
        if err != nil {
                return nil, err
        }
diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go
index b16417d..3eb12f4 100644
--- a/pulsar/retry_router.go
+++ b/pulsar/retry_router.go
@@ -23,7 +23,7 @@ import (
        "time"
 
        "github.com/apache/pulsar-client-go/pulsar/internal"
-       log "github.com/sirupsen/logrus"
+       "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
 const (
@@ -49,13 +49,14 @@ type retryRouter struct {
        policy    *DLQPolicy
        messageCh chan RetryMessage
        closeCh   chan interface{}
-       log       *log.Entry
+       log       log.Logger
 }
 
-func newRetryRouter(client Client, policy *DLQPolicy, retryEnabled bool) 
(*retryRouter, error) {
+func newRetryRouter(client Client, policy *DLQPolicy, retryEnabled bool, 
logger log.Logger) (*retryRouter, error) {
        r := &retryRouter{
                client: client,
                policy: policy,
+               log:    logger,
        }
 
        if policy != nil && retryEnabled {
@@ -69,7 +70,7 @@ func newRetryRouter(client Client, policy *DLQPolicy, 
retryEnabled bool) (*retry
 
                r.messageCh = make(chan RetryMessage)
                r.closeCh = make(chan interface{}, 1)
-               r.log = log.WithField("rlq-topic", policy.RetryLetterTopic)
+               r.log = logger.SubLogger(log.Fields{"rlq-topic": 
policy.RetryLetterTopic})
                go r.run()
        }
        return r, nil

Reply via email to