This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 3ab75cd define logger interface and add Logger field to ClientOptions
(#323)
3ab75cd is described below
commit 3ab75cdefb8758efbced97339621f13c7b4d4b15
Author: Shohi Wang <[email protected]>
AuthorDate: Fri Oct 9 11:08:08 2020 +0800
define logger interface and add Logger field to ClientOptions (#323)
### Motivation
enable users to configure the logger used by the client and use their own
implementation. If no logger is provided, a wrapped `logrus.StandardLogger()`
will be used. <s>This PR only solved part of the problem mentioned in the issue
https://github.com/apache/pulsar-client-go/issues/228.</s>
### Modifications
* define `Logger` and `Entry` interfaces used by the client
* add `Logger` field to ClientOptions
* add `logger` field to internal structures
* provide a logger implementation backed by logrus
* implement a no-op logger
---
pulsar/client.go | 7 ++
pulsar/client_impl.go | 21 +++--
pulsar/consumer_impl.go | 11 ++-
pulsar/consumer_multitopic.go | 6 +-
pulsar/consumer_partition.go | 19 ++---
pulsar/consumer_regex.go | 28 +++----
pulsar/consumer_regex_test.go | 9 ++-
pulsar/dlq_router.go | 9 ++-
pulsar/internal/batch_builder.go | 16 ++--
pulsar/internal/commands.go | 13 ++-
pulsar/internal/connection.go | 39 ++++++---
pulsar/internal/connection_pool.go | 22 ++++--
pulsar/internal/lookup_service.go | 18 +++--
pulsar/internal/lookup_service_test.go | 20 ++---
pulsar/internal/rpc_client.go | 12 +--
pulsar/log/log.go | 52 ++++++++++++
pulsar/log/logger.go | 67 ++++++++++++++++
pulsar/log/wrapper_logrus.go | 140 +++++++++++++++++++++++++++++++++
pulsar/negative_acks_tracker.go | 12 +--
pulsar/negative_acks_tracker_test.go | 5 +-
pulsar/producer_impl.go | 8 +-
pulsar/producer_partition.go | 25 +++---
pulsar/reader_impl.go | 9 +--
pulsar/retry_router.go | 9 ++-
24 files changed, 442 insertions(+), 135 deletions(-)
diff --git a/pulsar/client.go b/pulsar/client.go
index 2253607..ce9db91 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -22,6 +22,7 @@ import (
"time"
"github.com/apache/pulsar-client-go/pulsar/internal/auth"
+ "github.com/apache/pulsar-client-go/pulsar/log"
)
func NewClient(options ClientOptions) (Client, error) {
@@ -102,6 +103,12 @@ type ClientOptions struct {
// Max number of connections to a single broker that will kept in the
pool. (Default: 1 connection)
MaxConnectionsPerBroker int
+
+ // Configure the logger used by the client.
+ // By default, a wrapped logrus.StandardLogger will be used, namely,
+ // log.NewLoggerWithLogrus(logrus.StandardLogger())
+ // FIXME: use `logger` as internal field name instead of `log` as it's
more idiomatic
+ Logger log.Logger
}
type Client interface {
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index e0d6f47..20939c9 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -25,11 +25,12 @@ import (
"github.com/gogo/protobuf/proto"
- log "github.com/sirupsen/logrus"
+ "github.com/sirupsen/logrus"
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/auth"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+ "github.com/apache/pulsar-client-go/pulsar/log"
)
const (
@@ -42,16 +43,25 @@ type client struct {
rpcClient internal.RPCClient
handlers internal.ClientHandlers
lookupService internal.LookupService
+
+ log log.Logger
}
func newClient(options ClientOptions) (Client, error) {
+ var logger log.Logger
+ if options.Logger != nil {
+ logger = options.Logger
+ } else {
+ logger = log.NewLoggerWithLogrus(logrus.StandardLogger())
+ }
+
if options.URL == "" {
return nil, newError(ResultInvalidConfiguration, "URL is
required for client")
}
url, err := url.Parse(options.URL)
if err != nil {
- log.WithError(err).Error("Failed to parse service URL")
+ logger.WithError(err).Error("Failed to parse service URL")
return nil, newError(ResultInvalidConfiguration, "Invalid
service URL")
}
@@ -101,10 +111,11 @@ func newClient(options ClientOptions) (Client, error) {
}
c := &client{
- cnxPool: internal.NewConnectionPool(tlsConfig, authProvider,
connectionTimeout, maxConnectionsPerHost),
+ cnxPool: internal.NewConnectionPool(tlsConfig, authProvider,
connectionTimeout, maxConnectionsPerHost, logger),
+ log: logger,
}
- c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout)
- c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig
!= nil)
+ c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout,
logger)
+ c.lookupService = internal.NewLookupService(c.rpcClient, url, tlsConfig
!= nil, logger)
c.handlers = internal.NewClientHandlers()
return c, nil
}
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 13339af..5cb1dd2 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -29,10 +29,9 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
- log "github.com/sirupsen/logrus"
-
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+ "github.com/apache/pulsar-client-go/pulsar/log"
)
var (
@@ -80,7 +79,7 @@ type consumer struct {
errorCh chan error
ticker *time.Ticker
- log *log.Entry
+ log log.Logger
}
func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
@@ -146,11 +145,11 @@ func newConsumer(client *client, options ConsumerOptions)
(Consumer, error) {
}
}
- dlq, err := newDlqRouter(client, options.DLQ)
+ dlq, err := newDlqRouter(client, options.DLQ, client.log)
if err != nil {
return nil, err
}
- rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable)
+ rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable,
client.log)
if err != nil {
return nil, err
}
@@ -206,7 +205,7 @@ func newInternalConsumer(client *client, options
ConsumerOptions, topic string,
errorCh: make(chan error),
dlq: dlq,
rlq: rlq,
- log: log.WithField("topic", topic),
+ log:
client.log.SubLogger(log.Fields{"topic": topic}),
consumerName: options.Name,
}
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 27146c4..7705590 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -26,7 +26,7 @@ import (
pkgerrors "github.com/pkg/errors"
- log "github.com/sirupsen/logrus"
+ "github.com/apache/pulsar-client-go/pulsar/log"
)
type multiTopicConsumer struct {
@@ -42,7 +42,7 @@ type multiTopicConsumer struct {
closeOnce sync.Once
closeCh chan struct{}
- log *log.Entry
+ log log.Logger
}
func newMultiTopicConsumer(client *client, options ConsumerOptions, topics
[]string,
@@ -54,7 +54,7 @@ func newMultiTopicConsumer(client *client, options
ConsumerOptions, topics []str
closeCh: make(chan struct{}),
dlq: dlq,
rlq: rlq,
- log: log.WithField("topics", topics),
+ log: client.log.SubLogger(log.Fields{"topic": topics}),
consumerName: options.Name,
}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 882afd7..9f555d2 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -28,11 +28,10 @@ import (
"github.com/gogo/protobuf/proto"
- log "github.com/sirupsen/logrus"
-
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+ "github.com/apache/pulsar-client-go/pulsar/log"
)
var (
@@ -159,7 +158,7 @@ type partitionConsumer struct {
nackTracker *negativeAcksTracker
dlq *dlqRouter
- log *log.Entry
+ log log.Logger
compressionProviders map[pb.CompressionType]compression.Provider
}
@@ -185,16 +184,18 @@ func newPartitionConsumer(parent Consumer, client
*client, options *partitionCon
clearQueueCh: make(chan func(id trackingMessageID)),
compressionProviders:
make(map[pb.CompressionType]compression.Provider),
dlq: dlq,
- log: log.WithField("topic", options.topic),
}
- pc.log = pc.log.WithField("name", pc.name).
- WithField("subscription", options.subscription).
- WithField("consumerID", pc.consumerID)
- pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay)
+ pc.log = client.log.SubLogger(log.Fields{
+ "name": pc.name,
+ "topic": options.topic,
+ "subscription": options.subscription,
+ "consumerID": pc.consumerID,
+ })
+ pc.nackTracker = newNegativeAcksTracker(pc,
options.nackRedeliveryDelay, pc.log)
err := pc.grabConn()
if err != nil {
- log.WithError(err).Errorf("Failed to create consumer")
+ pc.log.WithError(err).Error("Failed to create consumer")
return nil, err
}
pc.log.Info("Created consumer")
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index 13ef600..d7ca183 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -28,9 +28,8 @@ import (
pkgerrors "github.com/pkg/errors"
- log "github.com/sirupsen/logrus"
-
"github.com/apache/pulsar-client-go/pulsar/internal"
+ "github.com/apache/pulsar-client-go/pulsar/log"
)
const (
@@ -59,7 +58,7 @@ type regexConsumer struct {
ticker *time.Ticker
- log *log.Entry
+ log log.Logger
consumerName string
}
@@ -82,7 +81,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn
*internal.TopicName, p
closeCh: make(chan struct{}),
- log: log.WithField("topic", tn.Name),
+ log: c.log.SubLogger(log.Fields{"topic": tn.Name}),
consumerName: opts.Name,
}
@@ -280,13 +279,12 @@ func (c *regexConsumer) discover() {
newTopics := topicsDiff(topics, known)
staleTopics := topicsDiff(known, topics)
- if log.GetLevel() == log.DebugLevel {
- l := c.log.WithFields(log.Fields{
+ c.log.
+ WithFields(log.Fields{
"new_topics": newTopics,
"old_topics": staleTopics,
- })
- l.Debug("discover topics")
- }
+ }).
+ Debug("discover topics")
c.unsubscribeCh <- staleTopics
c.subscribeCh <- newTopics
@@ -306,9 +304,7 @@ func (c *regexConsumer) knownTopics() []string {
}
func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq
*retryRouter) {
- if log.GetLevel() == log.DebugLevel {
- c.log.WithField("topics", topics).Debug("subscribe")
- }
+ c.log.WithField("topics", topics).Debug("subscribe")
consumers := make(map[string]Consumer, len(topics))
for ce := range subscriber(c.client, topics, c.options, c.messageCh,
dlq, rlq) {
if ce.err != nil {
@@ -326,11 +322,11 @@ func (c *regexConsumer) subscribe(topics []string, dlq
*dlqRouter, rlq *retryRou
}
func (c *regexConsumer) unsubscribe(topics []string) {
- if log.GetLevel() == log.DebugLevel {
- c.log.WithField("topics", topics).Debug("unsubscribe")
- }
+ c.log.WithField("topics", topics).Debug("unsubscribe")
+
consumers := make(map[string]Consumer, len(topics))
c.consumersLock.Lock()
+
for _, t := range topics {
if consumer, ok := c.consumers[t]; ok {
consumers[t] = consumer
@@ -340,7 +336,7 @@ func (c *regexConsumer) unsubscribe(topics []string) {
c.consumersLock.Unlock()
for t, consumer := range consumers {
- log.Debugf("unsubscribe from topic=%s subscription=%s", t,
c.options.SubscriptionName)
+ c.log.Debugf("unsubscribe from topic=%s subscription=%s", t,
c.options.SubscriptionName)
if err := consumer.Unsubscribe(); err != nil {
c.log.Warnf("unable to unsubscribe from topic=%s
subscription=%s",
t, c.options.SubscriptionName)
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index e4acf5f..ed27373 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/apache/pulsar-client-go/pulsar/internal"
+ "github.com/apache/pulsar-client-go/pulsar/log"
)
func TestFilterTopics(t *testing.T) {
@@ -153,8 +154,8 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c
Client, namespace string
AutoDiscoveryPeriod: 5 * time.Minute,
}
- dlq, _ := newDlqRouter(c.(*client), nil)
- rlq, _ := newRetryRouter(c.(*client), nil, false)
+ dlq, _ := newDlqRouter(c.(*client), nil, log.DefaultNopLogger())
+ rlq, _ := newRetryRouter(c.(*client), nil, false,
log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern,
make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
t.Fatal(err)
@@ -202,8 +203,8 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c
Client, namespace string
AutoDiscoveryPeriod: 5 * time.Minute,
}
- dlq, _ := newDlqRouter(c.(*client), nil)
- rlq, _ := newRetryRouter(c.(*client), nil, false)
+ dlq, _ := newDlqRouter(c.(*client), nil, log.DefaultNopLogger())
+ rlq, _ := newRetryRouter(c.(*client), nil, false,
log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern,
make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
t.Fatal(err)
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 6aa7e2b..b4d98b0 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -23,7 +23,7 @@ import (
"time"
"github.com/apache/pulsar-client-go/pulsar/internal"
- log "github.com/sirupsen/logrus"
+ "github.com/apache/pulsar-client-go/pulsar/log"
)
type dlqRouter struct {
@@ -32,13 +32,14 @@ type dlqRouter struct {
policy *DLQPolicy
messageCh chan ConsumerMessage
closeCh chan interface{}
- log *log.Entry
+ log log.Logger
}
-func newDlqRouter(client Client, policy *DLQPolicy) (*dlqRouter, error) {
+func newDlqRouter(client Client, policy *DLQPolicy, logger log.Logger)
(*dlqRouter, error) {
r := &dlqRouter{
client: client,
policy: policy,
+ log: logger,
}
if policy != nil {
@@ -52,7 +53,7 @@ func newDlqRouter(client Client, policy *DLQPolicy)
(*dlqRouter, error) {
r.messageCh = make(chan ConsumerMessage)
r.closeCh = make(chan interface{}, 1)
- r.log = log.WithField("dlq-topic", policy.DeadLetterTopic)
+ r.log = logger.SubLogger(log.Fields{"dlq-topic":
policy.DeadLetterTopic})
go r.run()
}
return r, nil
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index fe34f0c..81dda2d 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -20,11 +20,11 @@ package internal
import (
"time"
- "github.com/apache/pulsar-client-go/pulsar/internal/compression"
- pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/gogo/protobuf/proto"
- log "github.com/sirupsen/logrus"
+ "github.com/apache/pulsar-client-go/pulsar/internal/compression"
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+ "github.com/apache/pulsar-client-go/pulsar/log"
)
const (
@@ -63,12 +63,14 @@ type BatchBuilder struct {
compressionProvider compression.Provider
buffersPool BuffersPool
+
+ log log.Logger
}
// NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a
new batch message container.
func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string,
producerID uint64,
compressionType pb.CompressionType, level compression.Level,
- bufferPool BuffersPool) (*BatchBuilder, error) {
+ bufferPool BuffersPool, logger log.Logger) (*BatchBuilder, error) {
if maxMessages == 0 {
maxMessages = DefaultMaxMessagesPerBatch
}
@@ -92,6 +94,7 @@ func NewBatchBuilder(maxMessages uint, maxBatchSize uint,
producerName string, p
callbacks: []interface{}{},
compressionProvider: getCompressionProvider(compressionType,
level),
buffersPool: bufferPool,
+ log: logger,
}
if compressionType != pb.CompressionType_NONE {
@@ -162,7 +165,7 @@ func (bb *BatchBuilder) Flush() (batchData Buffer,
sequenceID uint64, callbacks
// No-Op for empty batch
return nil, 0, nil
}
- log.Debug("BatchBuilder flush: messages: ", bb.numMessages)
+ bb.log.Debug("BatchBuilder flush: messages: ", bb.numMessages)
bb.msgMetadata.NumMessagesInBatch = proto.Int32(int32(bb.numMessages))
bb.cmdSend.Send.NumMessages = proto.Int32(int32(bb.numMessages))
@@ -198,7 +201,6 @@ func getCompressionProvider(compressionType
pb.CompressionType,
case pb.CompressionType_ZSTD:
return compression.NewZStdProvider(level)
default:
- log.Panic("unsupported compression type")
- return nil
+ panic("unsupported compression type")
}
}
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index c32262e..2536bae 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -21,12 +21,9 @@ import (
"errors"
"fmt"
- "github.com/apache/pulsar-client-go/pulsar/internal/compression"
-
"github.com/gogo/protobuf/proto"
- log "github.com/sirupsen/logrus"
-
+ "github.com/apache/pulsar-client-go/pulsar/internal/compression"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
)
@@ -196,7 +193,7 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg
proto.Message) *pb.BaseCommand
case pb.BaseCommand_AUTH_RESPONSE:
cmd.AuthResponse = msg.(*pb.CommandAuthResponse)
default:
- log.Panic("Missing command type: ", cmdType)
+ panic(fmt.Sprintf("Missing command type: %v", cmdType))
}
return cmd
@@ -209,7 +206,7 @@ func addSingleMessageToBatch(wb Buffer, smm
*pb.SingleMessageMetadata, payload [
wb.ResizeIfNeeded(metadataSize)
_, err := smm.MarshalToSizedBuffer(wb.WritableSlice()[:metadataSize])
if err != nil {
- log.WithError(err).Fatal("Protobuf serialization error")
+ panic(fmt.Sprintf("Protobuf serialization error: %v", err))
}
wb.WrittenBytes(metadataSize)
@@ -235,7 +232,7 @@ func serializeBatch(wb Buffer,
wb.ResizeIfNeeded(cmdSize)
_, err := cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize])
if err != nil {
- log.WithError(err).Fatal("Protobuf error when serializing
cmdSend")
+ panic(fmt.Sprintf("Protobuf error when serializing cmdSend:
%v", err))
}
wb.WrittenBytes(cmdSize)
@@ -250,7 +247,7 @@ func serializeBatch(wb Buffer,
wb.ResizeIfNeeded(msgMetadataSize)
_, err =
msgMetadata.MarshalToSizedBuffer(wb.WritableSlice()[:msgMetadataSize])
if err != nil {
- log.WithError(err).Fatal("Protobuf error when serializing
msgMetadata")
+ panic(fmt.Sprintf("Protobuf error when serializing msgMetadata:
%v", err))
}
wb.WrittenBytes(msgMetadataSize)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 1905d17..aff263b 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -33,10 +33,10 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/gogo/protobuf/proto"
- log "github.com/sirupsen/logrus"
"github.com/apache/pulsar-client-go/pulsar/internal/auth"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+ "github.com/apache/pulsar-client-go/pulsar/log"
)
const (
@@ -167,7 +167,7 @@ type connection struct {
pingTicker *time.Ticker
pingCheckTicker *time.Ticker
- log *log.Entry
+ log log.Logger
requestIDGenerator uint64
@@ -189,21 +189,30 @@ type connection struct {
maxMessageSize int32
}
-func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions
*TLSOptions,
- connectionTimeout time.Duration, auth auth.Provider) *connection {
+// connectionOptions defines configurations for creating connection.
+type connectionOptions struct {
+ logicalAddr *url.URL
+ physicalAddr *url.URL
+ tls *TLSOptions
+ connectionTimeout time.Duration
+ auth auth.Provider
+ logger log.Logger
+}
+
+func newConnection(opts connectionOptions) *connection {
cnx := &connection{
state: int32(connectionInit),
- connectionTimeout: connectionTimeout,
- logicalAddr: logicalAddr,
- physicalAddr: physicalAddr,
+ connectionTimeout: opts.connectionTimeout,
+ logicalAddr: opts.logicalAddr,
+ physicalAddr: opts.physicalAddr,
writeBuffer: NewBuffer(4096),
- log: log.WithField("remote_addr",
physicalAddr),
+ log:
opts.logger.SubLogger(log.Fields{"remote_addr": opts.physicalAddr}),
pendingReqs: make(map[uint64]*request),
lastDataReceivedTime: time.Now(),
pingTicker: time.NewTicker(keepAliveInterval),
pingCheckTicker: time.NewTicker(keepAliveInterval),
- tlsOptions: tlsOptions,
- auth: auth,
+ tlsOptions: opts.tls,
+ auth: opts.auth,
closeCh: make(chan interface{}),
incomingRequestsCh: make(chan *request, 10),
@@ -273,7 +282,7 @@ func (c *connection) connect() bool {
c.Lock()
c.cnx = cnx
- c.log = c.log.WithField("local_addr", c.cnx.LocalAddr())
+ c.log = c.log.SubLogger(log.Fields{"local_addr": c.cnx.LocalAddr()})
c.log.Info("TCP connection established")
c.Unlock()
@@ -473,7 +482,8 @@ func (c *connection) writeCommand(cmd *pb.BaseCommand) {
c.writeBuffer.ResizeIfNeeded(cmdSize)
_, err :=
cmd.MarshalToSizedBuffer(c.writeBuffer.WritableSlice()[:cmdSize])
if err != nil {
- c.log.WithError(err).Fatal("Protobuf serialization error")
+ c.log.WithError(err).Error("Protobuf serialization error")
+ panic("Protobuf serialization error")
}
c.writeBuffer.WrittenBytes(cmdSize)
@@ -628,7 +638,10 @@ func (c *connection) handleMessage(response
*pb.CommandMessage, payload Buffer)
if consumer, ok := c.consumerHandler(consumerID); ok {
err := consumer.MessageReceived(response, payload)
if err != nil {
- c.log.WithField("consumerID",
consumerID).WithError(err).Error("handle message Id: ", response.MessageId)
+ c.log.
+ WithError(err).
+ WithField("consumerID", consumerID).
+ Error("handle message Id: ", response.MessageId)
}
} else {
c.log.WithField("consumerID", consumerID).Warn("Got unexpected
message: ", response.MessageId)
diff --git a/pulsar/internal/connection_pool.go
b/pulsar/internal/connection_pool.go
index a90ac42..133b280 100644
--- a/pulsar/internal/connection_pool.go
+++ b/pulsar/internal/connection_pool.go
@@ -25,8 +25,7 @@ import (
"time"
"github.com/apache/pulsar-client-go/pulsar/internal/auth"
-
- log "github.com/sirupsen/logrus"
+ "github.com/apache/pulsar-client-go/pulsar/log"
)
// ConnectionPool is a interface of connection pool.
@@ -45,6 +44,8 @@ type connectionPool struct {
auth auth.Provider
maxConnectionsPerHost int32
roundRobinCnt int32
+
+ log log.Logger
}
// NewConnectionPool init connection pool.
@@ -52,12 +53,14 @@ func NewConnectionPool(
tlsOptions *TLSOptions,
auth auth.Provider,
connectionTimeout time.Duration,
- maxConnectionsPerHost int) ConnectionPool {
+ maxConnectionsPerHost int,
+ logger log.Logger) ConnectionPool {
return &connectionPool{
tlsOptions: tlsOptions,
auth: auth,
connectionTimeout: connectionTimeout,
maxConnectionsPerHost: int32(maxConnectionsPerHost),
+ log: logger,
}
}
@@ -66,7 +69,7 @@ func (p *connectionPool) GetConnection(logicalAddr *url.URL,
physicalAddr *url.U
cachedCnx, found := p.pool.Load(key)
if found {
cnx := cachedCnx.(*connection)
- log.Debug("Found connection in cache:", cnx.logicalAddr,
cnx.physicalAddr)
+ p.log.Debug("Found connection in cache:", cnx.logicalAddr,
cnx.physicalAddr)
if err := cnx.waitUntilReady(); err == nil {
// Connection is ready to be used
@@ -74,11 +77,18 @@ func (p *connectionPool) GetConnection(logicalAddr
*url.URL, physicalAddr *url.U
}
// The cached connection is failed
p.pool.Delete(key)
- log.Debug("Removed failed connection from pool:",
cnx.logicalAddr, cnx.physicalAddr)
+ p.log.Debug("Removed failed connection from pool:",
cnx.logicalAddr, cnx.physicalAddr)
}
// Try to create a new connection
- newConnection := newConnection(logicalAddr, physicalAddr, p.tlsOptions,
p.connectionTimeout, p.auth)
+ newConnection := newConnection(connectionOptions{
+ logicalAddr: logicalAddr,
+ physicalAddr: physicalAddr,
+ tls: p.tlsOptions,
+ connectionTimeout: p.connectionTimeout,
+ auth: p.auth,
+ logger: p.log,
+ })
newCnx, wasCached := p.pool.LoadOrStore(key, newConnection)
cnx := newCnx.(*connection)
diff --git a/pulsar/internal/lookup_service.go
b/pulsar/internal/lookup_service.go
index 1673f4d..00f54ef 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -22,13 +22,12 @@ import (
"fmt"
"net/url"
+ "github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
- "github.com/gogo/protobuf/proto"
-
- log "github.com/sirupsen/logrus"
+ "github.com/apache/pulsar-client-go/pulsar/log"
)
var (
@@ -55,14 +54,17 @@ type lookupService struct {
rpcClient RPCClient
serviceURL *url.URL
tlsEnabled bool
+ log log.Logger
}
// NewLookupService init a lookup service struct and return an object of
LookupService.
-func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, tlsEnabled
bool) LookupService {
+func NewLookupService(rpcClient RPCClient, serviceURL *url.URL,
+ tlsEnabled bool, logger log.Logger) LookupService {
return &lookupService{
rpcClient: rpcClient,
serviceURL: serviceURL,
tlsEnabled: tlsEnabled,
+ log: logger.SubLogger(log.Fields{"serviceURL":
serviceURL}),
}
}
@@ -102,7 +104,7 @@ func (ls *lookupService) Lookup(topic string)
(*LookupResult, error) {
if err != nil {
return nil, err
}
- log.Debugf("Got topic{%s} lookup response: %+v", topic, res)
+ ls.log.Debugf("Got topic{%s} lookup response: %+v", topic, res)
for i := 0; i < lookupResultMaxRedirect; i++ {
lr := res.Response.LookupTopicResponse
@@ -114,7 +116,7 @@ func (ls *lookupService) Lookup(topic string)
(*LookupResult, error) {
return nil, err
}
- log.Debugf("Follow topic{%s} redirect to broker. %v /
%v - Use proxy: %v",
+ ls.log.Debugf("Follow topic{%s} redirect to broker. %v
/ %v - Use proxy: %v",
topic, lr.BrokerServiceUrl,
lr.BrokerServiceUrlTls, lr.ProxyThroughServiceUrl)
id := ls.rpcClient.NewRequestID()
@@ -131,7 +133,7 @@ func (ls *lookupService) Lookup(topic string)
(*LookupResult, error) {
continue
case pb.CommandLookupTopicResponse_Connect:
- log.Debugf("Successfully looked up topic{%s} on broker.
%s / %s - Use proxy: %t",
+ ls.log.Debugf("Successfully looked up topic{%s} on
broker. %s / %s - Use proxy: %t",
topic, lr.GetBrokerServiceUrl(),
lr.GetBrokerServiceUrlTls(), lr.GetProxyThroughServiceUrl())
logicalAddress, physicalAddress, err :=
ls.getBrokerAddress(lr)
@@ -149,7 +151,7 @@ func (ls *lookupService) Lookup(topic string)
(*LookupResult, error) {
if lr.Error != nil {
errorMsg = lr.Error.String()
}
- log.Warnf("Failed to lookup topic: %s, error msg: %s",
topic, errorMsg)
+ ls.log.Warnf("Failed to lookup topic: %s, error msg:
%s", topic, errorMsg)
return nil, fmt.Errorf("failed to lookup topic: %s",
errorMsg)
}
}
diff --git a/pulsar/internal/lookup_service_test.go
b/pulsar/internal/lookup_service_test.go
index 1eead50..fa22f18 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -21,9 +21,11 @@ import (
"net/url"
"testing"
- pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
+
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+ "github.com/apache/pulsar-client-go/pulsar/log"
)
type mockedRPCClient struct {
@@ -127,7 +129,7 @@ func TestLookupSuccess(t *testing.T) {
BrokerServiceUrl:
proto.String("pulsar://broker-1:6650"),
},
},
- }, url, false)
+ }, url, false, log.DefaultNopLogger())
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -159,7 +161,7 @@ func TestTlsLookupSuccess(t *testing.T) {
BrokerServiceUrlTls:
proto.String("pulsar+ssl://broker-1:6651"),
},
},
- }, url, true)
+ }, url, true, log.DefaultNopLogger())
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -192,7 +194,7 @@ func TestLookupWithProxy(t *testing.T) {
ProxyThroughServiceUrl: proto.Bool(true),
},
},
- }, url, false)
+ }, url, false, log.DefaultNopLogger())
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -225,7 +227,7 @@ func TestTlsLookupWithProxy(t *testing.T) {
ProxyThroughServiceUrl: proto.Bool(true),
},
},
- }, url, true)
+ }, url, true, log.DefaultNopLogger())
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -269,7 +271,7 @@ func TestLookupWithRedirect(t *testing.T) {
BrokerServiceUrl:
proto.String("pulsar://broker-1:6650"),
},
},
- }, url, false)
+ }, url, false, log.DefaultNopLogger())
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -313,7 +315,7 @@ func TestTlsLookupWithRedirect(t *testing.T) {
BrokerServiceUrlTls:
proto.String("pulsar+ssl://broker-1:6651"),
},
},
- }, url, true)
+ }, url, true, log.DefaultNopLogger())
lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
@@ -346,7 +348,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
ProxyThroughServiceUrl: proto.Bool(false),
},
},
- }, url, false)
+ }, url, false, log.DefaultNopLogger())
lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
@@ -374,7 +376,7 @@ func TestLookupWithLookupFailure(t *testing.T) {
Authoritative: proto.Bool(true),
},
},
- }, url, false)
+ }, url, false, log.DefaultNopLogger())
lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 0d4cc12..c7d810a 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -27,10 +27,10 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
+ "github.com/apache/pulsar-client-go/pulsar/log"
+
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/gogo/protobuf/proto"
-
- log "github.com/sirupsen/logrus"
)
var (
@@ -71,16 +71,16 @@ type rpcClient struct {
requestIDGenerator uint64
producerIDGenerator uint64
consumerIDGenerator uint64
-
- log *log.Entry
+ log log.Logger
}
-func NewRPCClient(serviceURL *url.URL, pool ConnectionPool, requestTimeout
time.Duration) RPCClient {
+func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
+ requestTimeout time.Duration, logger log.Logger) RPCClient {
return &rpcClient{
serviceURL: serviceURL,
pool: pool,
requestTimeout: requestTimeout,
- log: log.WithField("serviceURL", serviceURL),
+ log: logger.SubLogger(log.Fields{"serviceURL":
serviceURL}),
}
}
diff --git a/pulsar/log/log.go b/pulsar/log/log.go
new file mode 100644
index 0000000..7ed5231
--- /dev/null
+++ b/pulsar/log/log.go
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package log
+
+// DefaultNopLogger returns a nop logger.
+func DefaultNopLogger() Logger {
+ return nopLogger{}
+}
+
+type nopLogger struct{}
+
+func (l nopLogger) SubLogger(fields Fields) Logger { return l }
+func (l nopLogger) WithFields(fields Fields) Entry { return
nopEntry{} }
+func (l nopLogger) WithField(name string, value interface{}) Entry { return
nopEntry{} }
+func (l nopLogger) WithError(err error) Entry { return
nopEntry{} }
+func (l nopLogger) Debug(args ...interface{}) {}
+func (l nopLogger) Info(args ...interface{}) {}
+func (l nopLogger) Warn(args ...interface{}) {}
+func (l nopLogger) Error(args ...interface{}) {}
+func (l nopLogger) Debugf(format string, args ...interface{}) {}
+func (l nopLogger) Infof(format string, args ...interface{}) {}
+func (l nopLogger) Warnf(format string, args ...interface{}) {}
+func (l nopLogger) Errorf(format string, args ...interface{}) {}
+
+type nopEntry struct{}
+
+func (e nopEntry) WithFields(fields Fields) Entry { return
nopEntry{} }
+func (e nopEntry) WithField(name string, value interface{}) Entry { return
nopEntry{} }
+
+func (e nopEntry) Debug(args ...interface{}) {}
+func (e nopEntry) Info(args ...interface{}) {}
+func (e nopEntry) Warn(args ...interface{}) {}
+func (e nopEntry) Error(args ...interface{}) {}
+func (e nopEntry) Debugf(format string, args ...interface{}) {}
+func (e nopEntry) Infof(format string, args ...interface{}) {}
+func (e nopEntry) Warnf(format string, args ...interface{}) {}
+func (e nopEntry) Errorf(format string, args ...interface{}) {}
diff --git a/pulsar/log/logger.go b/pulsar/log/logger.go
new file mode 100644
index 0000000..693416c
--- /dev/null
+++ b/pulsar/log/logger.go
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package log defines the logger interfaces used by pulsar client.
+// Users can leverage these interfaces to provide a customized logger
+// implementation.
+//
+// The Logger and Entry interfaces defined here are inspired
+// by sirupsen/logrus, both logrus and zap logging libraries
+// are good resources to learn how to implement a effective
+// logging library.
+//
+// Besides the interfaces, this log library also provides an
+// implementation based on logrus, and a No-op one as well.
+package log
+
+// Fields type, used to pass to `WithFields`.
+type Fields map[string]interface{}
+
+// Logger describes the interface that must be implemeted by all loggers.
+type Logger interface {
+ SubLogger(fields Fields) Logger
+
+ WithFields(fields Fields) Entry
+ WithField(name string, value interface{}) Entry
+ WithError(err error) Entry
+
+ Debug(args ...interface{})
+ Info(args ...interface{})
+ Warn(args ...interface{})
+ Error(args ...interface{})
+
+ Debugf(format string, args ...interface{})
+ Infof(format string, args ...interface{})
+ Warnf(format string, args ...interface{})
+ Errorf(format string, args ...interface{})
+}
+
+// Entry describes the interface for the logger entry.
+type Entry interface {
+ WithFields(fields Fields) Entry
+ WithField(name string, value interface{}) Entry
+
+ Debug(args ...interface{})
+ Info(args ...interface{})
+ Warn(args ...interface{})
+ Error(args ...interface{})
+
+ Debugf(format string, args ...interface{})
+ Infof(format string, args ...interface{})
+ Warnf(format string, args ...interface{})
+ Errorf(format string, args ...interface{})
+}
diff --git a/pulsar/log/wrapper_logrus.go b/pulsar/log/wrapper_logrus.go
new file mode 100644
index 0000000..efcf1e9
--- /dev/null
+++ b/pulsar/log/wrapper_logrus.go
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package log
+
+import (
+ "github.com/sirupsen/logrus"
+)
+
+// logrusWrapper implements Logger interface
+// based on underlying logrus.FieldLogger
+type logrusWrapper struct {
+ l logrus.FieldLogger
+}
+
+// NewLoggerWithLogrus creates a new logger which wraps
+// the given logrus.Logger
+func NewLoggerWithLogrus(logger *logrus.Logger) Logger {
+ return &logrusWrapper{
+ l: logger,
+ }
+}
+
+func (l *logrusWrapper) SubLogger(fs Fields) Logger {
+ return &logrusWrapper{
+ l: l.l.WithFields(logrus.Fields(fs)),
+ }
+}
+
+func (l *logrusWrapper) WithFields(fs Fields) Entry {
+ return logrusEntry{
+ e: l.l.WithFields(logrus.Fields(fs)),
+ }
+}
+
+func (l *logrusWrapper) WithField(name string, value interface{}) Entry {
+ return logrusEntry{
+ e: l.l.WithField(name, value),
+ }
+}
+
+func (l *logrusWrapper) WithError(err error) Entry {
+ return logrusEntry{
+ e: l.l.WithError(err),
+ }
+}
+
+func (l *logrusWrapper) Debug(args ...interface{}) {
+ l.l.Debug(args)
+}
+
+func (l *logrusWrapper) Info(args ...interface{}) {
+ l.l.Info(args)
+}
+
+func (l *logrusWrapper) Warn(args ...interface{}) {
+ l.l.Warn(args)
+}
+
+func (l *logrusWrapper) Error(args ...interface{}) {
+ l.l.Error(args)
+}
+
+func (l *logrusWrapper) Debugf(format string, args ...interface{}) {
+ l.l.Debugf(format, args)
+}
+
+func (l *logrusWrapper) Infof(format string, args ...interface{}) {
+ l.l.Infof(format, args)
+}
+
+func (l *logrusWrapper) Warnf(format string, args ...interface{}) {
+ l.l.Warnf(format, args)
+}
+
+func (l *logrusWrapper) Errorf(format string, args ...interface{}) {
+ l.l.Errorf(format, args)
+}
+
+type logrusEntry struct {
+ e logrus.FieldLogger
+}
+
+func (l logrusEntry) WithFields(fs Fields) Entry {
+ return logrusEntry{
+ e: l.e.WithFields(logrus.Fields(fs)),
+ }
+}
+
+func (l logrusEntry) WithField(name string, value interface{}) Entry {
+ return logrusEntry{
+ e: l.e.WithField(name, value),
+ }
+}
+
+func (l logrusEntry) Debug(args ...interface{}) {
+ l.e.Debug(args)
+}
+
+func (l logrusEntry) Info(args ...interface{}) {
+ l.e.Info(args)
+}
+
+func (l logrusEntry) Warn(args ...interface{}) {
+ l.e.Warn(args)
+}
+
+func (l logrusEntry) Error(args ...interface{}) {
+ l.e.Error(args)
+}
+
+func (l logrusEntry) Debugf(format string, args ...interface{}) {
+ l.e.Debugf(format, args)
+}
+
+func (l logrusEntry) Infof(format string, args ...interface{}) {
+ l.e.Infof(format, args)
+}
+
+func (l logrusEntry) Warnf(format string, args ...interface{}) {
+ l.e.Warnf(format, args)
+}
+
+func (l logrusEntry) Errorf(format string, args ...interface{}) {
+ l.e.Errorf(format, args)
+}
diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go
index a7dc88e..23a8a91 100644
--- a/pulsar/negative_acks_tracker.go
+++ b/pulsar/negative_acks_tracker.go
@@ -21,7 +21,7 @@ import (
"sync"
"time"
- log "github.com/sirupsen/logrus"
+ log "github.com/apache/pulsar-client-go/pulsar/log"
)
type redeliveryConsumer interface {
@@ -36,15 +36,17 @@ type negativeAcksTracker struct {
rc redeliveryConsumer
tick *time.Ticker
delay time.Duration
+ log log.Logger
}
-func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration)
*negativeAcksTracker {
+func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration, logger
log.Logger) *negativeAcksTracker {
t := &negativeAcksTracker{
doneCh: make(chan interface{}),
negativeAcks: make(map[messageID]time.Time),
rc: rc,
tick: time.NewTicker(delay / 3),
delay: delay,
+ log: logger,
}
go t.track()
@@ -77,7 +79,7 @@ func (t *negativeAcksTracker) track() {
for {
select {
case <-t.doneCh:
- log.Debug("Closing nack tracker")
+ t.log.Debug("Closing nack tracker")
return
case <-t.tick.C:
@@ -87,9 +89,9 @@ func (t *negativeAcksTracker) track() {
now := time.Now()
msgIds := make([]messageID, 0)
for msgID, targetTime := range t.negativeAcks {
- log.Debugf("MsgId: %v -- targetTime: %v
-- now: %v", msgID, targetTime, now)
+ t.log.Debugf("MsgId: %v -- targetTime:
%v -- now: %v", msgID, targetTime, now)
if targetTime.Before(now) {
- log.Debugf("Adding MsgId: %v",
msgID)
+ t.log.Debugf("Adding MsgId:
%v", msgID)
msgIds = append(msgIds, msgID)
delete(t.negativeAcks, msgID)
}
diff --git a/pulsar/negative_acks_tracker_test.go
b/pulsar/negative_acks_tracker_test.go
index 3930b7f..27e07d8 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -23,6 +23,7 @@ import (
"testing"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/log"
"github.com/stretchr/testify/assert"
)
@@ -74,7 +75,7 @@ func (nmc *nackMockedConsumer) Wait() <-chan messageID {
func TestNacksTracker(t *testing.T) {
nmc := newNackMockedConsumer()
- nacks := newNegativeAcksTracker(nmc, testNackDelay)
+ nacks := newNegativeAcksTracker(nmc, testNackDelay,
log.DefaultNopLogger())
nacks.Add(messageID{
ledgerID: 1,
@@ -105,7 +106,7 @@ func TestNacksTracker(t *testing.T) {
func TestNacksWithBatchesTracker(t *testing.T) {
nmc := newNackMockedConsumer()
- nacks := newNegativeAcksTracker(nmc, testNackDelay)
+ nacks := newNegativeAcksTracker(nmc, testNackDelay,
log.DefaultNopLogger())
nacks.Add(messageID{
ledgerID: 1,
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 458d2b5..25499ce 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -27,9 +27,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
- log "github.com/sirupsen/logrus"
-
"github.com/apache/pulsar-client-go/pulsar/internal"
+ "github.com/apache/pulsar-client-go/pulsar/log"
)
var (
@@ -60,8 +59,7 @@ type producer struct {
messageRouter func(*ProducerMessage, TopicMetadata) int
ticker *time.Ticker
tickerStop chan struct{}
-
- log *log.Entry
+ log log.Logger
}
const defaultBatchingMaxPublishDelay = 10 * time.Millisecond
@@ -88,7 +86,7 @@ func newProducer(client *client, options *ProducerOptions)
(*producer, error) {
options: options,
topic: options.Topic,
client: client,
- log: log.WithField("topic", options.Topic),
+ log: client.log.SubLogger(log.Fields{"topic":
options.Topic}),
}
var batchingMaxPublishDelay time.Duration
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 870da2e..e8cf0f7 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -31,10 +31,9 @@ import (
"github.com/gogo/protobuf/proto"
- log "github.com/sirupsen/logrus"
-
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+ "github.com/apache/pulsar-client-go/pulsar/log"
)
const (
@@ -89,7 +88,7 @@ type partitionProducer struct {
state int32
client *client
topic string
- log *log.Entry
+ log log.Logger
cnx internal.Connection
options *ProducerOptions
@@ -125,11 +124,12 @@ func newPartitionProducer(client *client, topic string,
options *ProducerOptions
maxPendingMessages = options.MaxPendingMessages
}
+ logger := client.log.SubLogger(log.Fields{"topic": topic})
p := &partitionProducer{
state: producerInit,
- log: log.WithField("topic", topic),
client: client,
topic: topic,
+ log: logger,
options: options,
producerID: client.rpcClient.NewProducerID(),
eventsChan: make(chan interface{}, maxPendingMessages),
@@ -146,12 +146,15 @@ func newPartitionProducer(client *client, topic string,
options *ProducerOptions
err := p.grabCnx()
if err != nil {
- log.WithError(err).Errorf("Failed to create producer")
+ logger.WithError(err).Error("Failed to create producer")
return nil, err
}
- p.log = p.log.WithField("producer_name", p.producerName).
- WithField("producerID", p.producerID)
+ p.log = p.log.SubLogger(log.Fields{
+ "producer_name": p.producerName,
+ "producerID": p.producerID,
+ })
+
p.log.WithField("cnx", p.cnx.ID()).Info("Created producer")
atomic.StoreInt32(&p.state, producerReady)
@@ -195,7 +198,8 @@ func (p *partitionProducer) grabCnx() error {
p.batchBuilder, err =
internal.NewBatchBuilder(p.options.BatchingMaxMessages,
p.options.BatchingMaxSize,
p.producerName, p.producerID,
pb.CompressionType(p.options.CompressionType),
compression.Level(p.options.CompressionLevel),
- p)
+ p,
+ p.log)
if err != nil {
return err
}
@@ -295,9 +299,10 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
if len(msg.Payload) > int(p.cnx.GetMaxMessageSize()) {
p.publishSemaphore.Release()
request.callback(nil, request.msg, errMessageTooLarge)
- p.log.WithField("size", len(msg.Payload)).
+ p.log.WithError(errMessageTooLarge).
+ WithField("size", len(msg.Payload)).
WithField("properties", msg.Properties).
- WithError(errMessageTooLarge).Error()
+ Error()
publishErrors.Inc()
return
}
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 17a7084..89e9063 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -26,7 +26,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
- log "github.com/sirupsen/logrus"
+ "github.com/apache/pulsar-client-go/pulsar/log"
)
const (
@@ -50,8 +50,7 @@ type reader struct {
pc *partitionConsumer
messageCh chan ConsumerMessage
lastMessageInBroker trackingMessageID
-
- log *log.Entry
+ log log.Logger
}
func newReader(client *client, options ReaderOptions) (Reader, error) {
@@ -106,11 +105,11 @@ func newReader(client *client, options ReaderOptions)
(Reader, error) {
reader := &reader{
messageCh: make(chan ConsumerMessage),
- log: log.WithField("topic", options.Topic),
+ log: client.log.SubLogger(log.Fields{"topic":
options.Topic}),
}
// Provide dummy dlq router with not dlq policy
- dlq, err := newDlqRouter(client, nil)
+ dlq, err := newDlqRouter(client, nil, client.log)
if err != nil {
return nil, err
}
diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go
index b16417d..3eb12f4 100644
--- a/pulsar/retry_router.go
+++ b/pulsar/retry_router.go
@@ -23,7 +23,7 @@ import (
"time"
"github.com/apache/pulsar-client-go/pulsar/internal"
- log "github.com/sirupsen/logrus"
+ "github.com/apache/pulsar-client-go/pulsar/log"
)
const (
@@ -49,13 +49,14 @@ type retryRouter struct {
policy *DLQPolicy
messageCh chan RetryMessage
closeCh chan interface{}
- log *log.Entry
+ log log.Logger
}
-func newRetryRouter(client Client, policy *DLQPolicy, retryEnabled bool)
(*retryRouter, error) {
+func newRetryRouter(client Client, policy *DLQPolicy, retryEnabled bool,
logger log.Logger) (*retryRouter, error) {
r := &retryRouter{
client: client,
policy: policy,
+ log: logger,
}
if policy != nil && retryEnabled {
@@ -69,7 +70,7 @@ func newRetryRouter(client Client, policy *DLQPolicy,
retryEnabled bool) (*retry
r.messageCh = make(chan RetryMessage)
r.closeCh = make(chan interface{}, 1)
- r.log = log.WithField("rlq-topic", policy.RetryLetterTopic)
+ r.log = logger.SubLogger(log.Fields{"rlq-topic":
policy.RetryLetterTopic})
go r.run()
}
return r, nil