This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 5004b5d  Enable golangci lint for project (#138)
5004b5d is described below

commit 5004b5d81001f7276091b11699cf568778d83556
Author: 冉小龙 <[email protected]>
AuthorDate: Fri Dec 27 10:51:34 2019 +0800

    Enable golangci lint for project (#138)
    
    ### Modifications
    
    Enable golangci lint for project, checks as follows:
    
    ```
        - bodyclose
        - deadcode
        - gocritic
        - goimports
        - golint
        - gosimple
        - govet
        - ineffassign
        - interfacer
        - misspell
        - staticcheck
        - structcheck
        - stylecheck
        - typecheck
        - unconvert
        - unparam
        - unused
        - varcheck
        - lll
        - prealloc
        - maligned
    ```
---
 .gitignore                             |  2 ++
 .golangci.yml                          | 44 ++++++++++++++++------------------
 docker-ci.sh                           |  5 +++-
 perf/perf-producer.go                  |  3 ++-
 perf/pulsar-perf-go.go                 |  2 +-
 pulsar/client_impl.go                  | 12 +++-------
 pulsar/client_impl_test.go             | 14 ++++++-----
 pulsar/consumer.go                     |  4 ++--
 pulsar/consumer_impl.go                |  3 +--
 pulsar/consumer_multitopic_test.go     | 23 +++++++++++-------
 pulsar/consumer_partition.go           | 25 +++++++++----------
 pulsar/consumer_regex.go               |  4 ++--
 pulsar/consumer_regex_test.go          | 11 +++++----
 pulsar/error.go                        | 17 ++++++++-----
 pulsar/impl_message.go                 |  6 -----
 pulsar/impl_message_test.go            |  4 ++--
 pulsar/internal/batch_builder.go       |  2 +-
 pulsar/internal/blocking_queue_test.go |  6 ++---
 pulsar/internal/commands.go            |  7 +++---
 pulsar/internal/compression/zstd.go    |  3 ++-
 pulsar/internal/connection.go          | 22 ++++++++---------
 pulsar/internal/default_router.go      |  3 ++-
 pulsar/internal/lookup_service.go      |  3 ++-
 pulsar/internal/lookup_service_test.go |  6 +++--
 pulsar/internal/rpc_client.go          | 10 +++++---
 pulsar/message.go                      |  3 ++-
 pulsar/negative_acks_tracker.go        | 13 +++++-----
 pulsar/negative_acks_tracker_test.go   |  1 -
 pulsar/producer.go                     | 13 ++++++----
 pulsar/producer_impl.go                |  3 ++-
 pulsar/producer_partition.go           | 22 ++++++++---------
 pulsar/producer_test.go                |  3 ++-
 pulsar/test_helper.go                  |  7 ++----
 run-ci.sh                              |  2 +-
 34 files changed, 163 insertions(+), 145 deletions(-)

diff --git a/.gitignore b/.gitignore
index 7daaebc..bc14ef8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,3 +11,5 @@
 *.out
 
 perf/perf
+pulsar-perf
+bin
diff --git a/.golangci.yml b/.golangci.yml
index 272cc9a..18560b2 100644
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -2,29 +2,27 @@
 # https://github.com/golangci/golangci-lint#install
 # We can execute `golangci-lint run` for code checking.
 run:
-  tests: true
-  skip-dirs:
-    - _examples
-
-output:
-  print-issued-lines: false
+  deadline: 6m
 
 linters:
-  enable-all: true
-  disable:
-    - maligned
-    - megacheck
+  disable-all: true
+  enable:
+    - bodyclose
+    - deadcode
+    - goimports
+    - golint
+    - gosimple
+    - govet
+    - ineffassign
+    - interfacer
+    - misspell
+    - staticcheck
+    - structcheck
+    - stylecheck
+    - typecheck
+    - unconvert
+    - unparam
+    - varcheck
     - lll
-    - gocyclo
-    - gochecknoglobals
-
-linters-settings:
-  govet:
-    check-shadowing: true
-  gocyclo:
-    min-complexity: 10
-  dupl:
-    threshold: 100
-  goconst:
-    min-len: 8
-    min-occurrences: 3
+    - prealloc
+    - maligned
diff --git a/docker-ci.sh b/docker-ci.sh
index 81486cf..f6924db 100755
--- a/docker-ci.sh
+++ b/docker-ci.sh
@@ -20,6 +20,9 @@
 
 set -e -x
 
+# Project code style check
+docker run --rm -v $(pwd):/app -w /app golangci/golangci-lint:v1.21.0 
golangci-lint run -v -c ./.golangci.yml ./...
+
 SRC_DIR=$(git rev-parse --show-toplevel)
 cd ${SRC_DIR}
 
@@ -28,4 +31,4 @@ IMAGE_NAME=pulsar-client-go-test:latest
 docker build -t ${IMAGE_NAME} .
 
 docker run -i -v ${PWD}:/pulsar-client-go ${IMAGE_NAME} \
-       bash -c "cd /pulsar-client-go && ./run-ci.sh"
\ No newline at end of file
+       bash -c "cd /pulsar-client-go && ./run-ci.sh"
diff --git a/perf/perf-producer.go b/perf/perf-producer.go
index 035285b..5660a80 100644
--- a/perf/perf-producer.go
+++ b/perf/perf-producer.go
@@ -141,7 +141,8 @@ func produce(produceArgs *ProduceArgs, stop <-chan 
struct{}) {
                        return
                case <-tick.C:
                        messageRate := float64(messagesPublished) / float64(10)
-                       log.Infof(`Stats - Publish rate: %6.1f msg/s - %6.1f 
Mbps - Latency ms: 50%% %5.1f - 95%% %5.1f - 99%% %5.1f - 99.9%% %5.1f - max 
%6.1f`,
+                       log.Infof(`Stats - Publish rate: %6.1f msg/s - %6.1f 
Mbps - 
+                               Latency ms: 50%% %5.1f -95%% %5.1f - 99%% %5.1f 
- 99.9%% %5.1f - max %6.1f`,
                                messageRate,
                                
messageRate*float64(produceArgs.MessageSize)/1024/1024*8,
                                q.Query(0.5)*1000,
diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go
index f75e5d0..3ceff71 100644
--- a/perf/pulsar-perf-go.go
+++ b/perf/pulsar-perf-go.go
@@ -32,7 +32,7 @@ import (
        "github.com/apache/pulsar-client-go/pulsar"
 )
 
-// global flags
+// FlagProfile is a global flag
 var FlagProfile bool
 var flagDebug bool
 
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 7873b56..04d2d2b 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -33,21 +33,15 @@ import (
 )
 
 const (
-       defaultConnectionTimeout = 30*time.Second
-       defaultOperationTimeout = 30*time.Second
+       defaultConnectionTimeout = 30 * time.Second
+       defaultOperationTimeout  = 30 * time.Second
 )
 
 type client struct {
-       options ClientOptions
-
        cnxPool       internal.ConnectionPool
        rpcClient     internal.RPCClient
+       handlers      internal.ClientHandlers
        lookupService internal.LookupService
-       auth          auth.Provider
-
-       handlers            internal.ClientHandlers
-       producerIDGenerator uint64
-       consumerIDGenerator uint64
 }
 
 func newClient(options ClientOptions) (Client, error) {
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index 841069a..8917c46 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -19,9 +19,10 @@ package pulsar
 
 import (
        "fmt"
-       "github.com/stretchr/testify/assert"
        "io/ioutil"
        "testing"
+
+       "github.com/stretchr/testify/assert"
 )
 
 func TestClient(t *testing.T) {
@@ -176,7 +177,7 @@ func TestTokenAuth(t *testing.T) {
 
 func TestTokenAuthWithSupplier(t *testing.T) {
        client, err := NewClient(ClientOptions{
-               URL:            serviceURL,
+               URL: serviceURL,
                Authentication: NewAuthenticationTokenFromSupplier(func() (s 
string, err error) {
                        token, err := ioutil.ReadFile(tokenFilePath)
                        if err != nil {
@@ -224,8 +225,8 @@ func TestTopicPartitions(t *testing.T) {
        defer client.Close()
 
        // Create topic with 5 partitions
-       
httpPut("admin/v2/persistent/public/default/TestGetTopicPartitions/partitions",
-               5)
+       err = 
httpPut("admin/v2/persistent/public/default/TestGetTopicPartitions/partitions", 
5)
+       assert.Nil(t, err)
 
        partitionedTopic := "persistent://public/default/TestGetTopicPartitions"
 
@@ -260,14 +261,15 @@ func TestNamespaceTopicsNamespaceDoesNotExit(t 
*testing.T) {
        // fetch from namespace that does not exist
        name := generateRandomName()
        topics, err := ci.namespaceTopics(fmt.Sprintf("%s/%s", name, name))
+       assert.Nil(t, err)
        assert.Equal(t, 0, len(topics))
 }
 
 func TestNamespaceTopics(t *testing.T) {
        name := generateRandomName()
        namespace := fmt.Sprintf("public/%s", name)
-       namespaceUrl := fmt.Sprintf("admin/v2/namespaces/%s", namespace)
-       err := httpPut(namespaceUrl, anonymousNamespacePolicy())
+       namespaceURL := fmt.Sprintf("admin/v2/namespaces/%s", namespace)
+       err := httpPut(namespaceURL, anonymousNamespacePolicy())
        if err != nil {
                t.Fatal()
        }
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 591cab5..a98eb2d 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -45,8 +45,8 @@ const (
        // If that consumer disconnects, one of the other connected consumers 
will start receiving messages.
        Failover
 
-       // KeyShared subscription mode, multiple consumer will be able to use 
the same subscription and all messages with the same key
-       // will be dispatched to only one consumer
+       // KeyShared subscription mode, multiple consumer will be able to use 
the same
+       // subscription and all messages with the same key will be dispatched 
to only one consumer
        KeyShared
 )
 
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 4a1f85e..c962e52 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -41,8 +41,7 @@ type acker interface {
 }
 
 type consumer struct {
-       options ConsumerOptions
-
+       options   ConsumerOptions
        consumers []*partitionConsumer
 
        // channel used to deliver message to clients
diff --git a/pulsar/consumer_multitopic_test.go 
b/pulsar/consumer_multitopic_test.go
index 6ac9da0..47704f9 100644
--- a/pulsar/consumer_multitopic_test.go
+++ b/pulsar/consumer_multitopic_test.go
@@ -54,7 +54,7 @@ func TestMultiTopicConsumerReceive(t *testing.T) {
                if err != nil {
                        t.Fatal(err)
                }
-               err = genMessages(p, 5, func(idx int) string {
+               err = genMessages(p, 10, func(idx int) string {
                        return fmt.Sprintf("topic-%d-hello-%d", i+1, idx)
                })
                p.Close()
@@ -65,16 +65,21 @@ func TestMultiTopicConsumerReceive(t *testing.T) {
 
        receivedTopic1 := 0
        receivedTopic2 := 0
-       for receivedTopic1+receivedTopic2 < 10 {
+       // nolint
+       for receivedTopic1+receivedTopic2 < 20 {
                select {
-               case cm := <-consumer.Chan():
-                       msg := string(cm.Payload())
-                       if strings.HasPrefix(msg, "topic-1") {
-                               receivedTopic1++
-                       } else if strings.HasPrefix(msg, "topic-2") {
-                               receivedTopic2++
+               case cm, ok := <-consumer.Chan():
+                       if ok {
+                               msg := string(cm.Payload())
+                               if strings.HasPrefix(msg, "topic-1") {
+                                       receivedTopic1++
+                               } else if strings.HasPrefix(msg, "topic-2") {
+                                       receivedTopic2++
+                               }
+                               consumer.Ack(cm.Message)
+                       } else {
+                               t.Fail()
                        }
-                       consumer.Ack(cm.Message)
                }
        }
        assert.Equal(t, receivedTopic1, receivedTopic2)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 81cb9ad..55d1318 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -179,9 +179,9 @@ func (pc *partitionConsumer) internalRedeliver(req 
*redeliveryRequest) {
        msgIds := req.msgIds
        pc.log.Debug("Request redelivery after negative ack for messages", 
msgIds)
 
-       msgIdDataList := make([]*pb.MessageIdData, len(msgIds))
+       msgIDDataList := make([]*pb.MessageIdData, len(msgIds))
        for i := 0; i < len(msgIds); i++ {
-               msgIdDataList[i] = &pb.MessageIdData{
+               msgIDDataList[i] = &pb.MessageIdData{
                        LedgerId: proto.Uint64(uint64(msgIds[i].ledgerID)),
                        EntryId:  proto.Uint64(uint64(msgIds[i].entryID)),
                }
@@ -190,7 +190,7 @@ func (pc *partitionConsumer) internalRedeliver(req 
*redeliveryRequest) {
        pc.client.rpcClient.RequestOnCnxNoWait(pc.conn,
                pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, 
&pb.CommandRedeliverUnacknowledgedMessages{
                        ConsumerId: proto.Uint64(pc.consumerID),
-                       MessageIds: msgIdDataList,
+                       MessageIds: msgIDDataList,
                })
 }
 
@@ -207,12 +207,12 @@ func (pc *partitionConsumer) Close() {
 }
 
 func (pc *partitionConsumer) internalAck(req *ackRequest) {
-       msgId := req.msgID
+       msgID := req.msgID
 
        messageIDs := make([]*pb.MessageIdData, 1)
        messageIDs[0] = &pb.MessageIdData{
-               LedgerId: proto.Uint64(uint64(msgId.ledgerID)),
-               EntryId:  proto.Uint64(uint64(msgId.entryID)),
+               LedgerId: proto.Uint64(uint64(msgID.ledgerID)),
+               EntryId:  proto.Uint64(uint64(msgID.entryID)),
        }
 
        cmdAck := &pb.CommandAck{
@@ -565,7 +565,7 @@ func (pc *partitionConsumer) grabConn() error {
 func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload 
internal.Buffer) (internal.Buffer, error) {
        provider, ok := compressionProviders[msgMeta.GetCompression()]
        if !ok {
-               err := fmt.Errorf("Unsupported compression type: %v", 
msgMeta.GetCompression())
+               err := fmt.Errorf("unsupported compression type: %v", 
msgMeta.GetCompression())
                pc.log.WithError(err).Error("Failed to decompress message.")
                return nil, err
        }
@@ -573,21 +573,22 @@ func (pc *partitionConsumer) Decompress(msgMeta 
*pb.MessageMetadata, payload int
        uncompressed, err := provider.Decompress(payload.ReadableSlice(), 
int(msgMeta.GetUncompressedSize()))
        if err != nil {
                return nil, err
-       } else {
-               return internal.NewBufferWrapper(uncompressed), nil
        }
+
+       return internal.NewBufferWrapper(uncompressed), nil
 }
 
-func (pc *partitionConsumer) discardCorruptedMessage(msgId *pb.MessageIdData, 
validationError pb.CommandAck_ValidationError) {
+func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
+       validationError pb.CommandAck_ValidationError) {
        pc.log.WithFields(log.Fields{
-               "msgId":           msgId,
+               "msgId":           msgID,
                "validationError": validationError,
        }).Error("Discarding corrupted message")
 
        pc.client.rpcClient.RequestOnCnxNoWait(pc.conn,
                pb.BaseCommand_ACK, &pb.CommandAck{
                        ConsumerId:      proto.Uint64(pc.consumerID),
-                       MessageId:       []*pb.MessageIdData{msgId},
+                       MessageId:       []*pb.MessageIdData{msgID},
                        AckType:         pb.CommandAck_Individual.Enum(),
                        ValidationError: validationError.Enum(),
                })
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index e68c495..1d2f157 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -69,7 +69,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn 
*internal.TopicName, p
                namespace: tn.Namespace,
                pattern:   pattern,
 
-               consumers:     make(map[string]Consumer, 0),
+               consumers:     make(map[string]Consumer),
                subscribeCh:   make(chan []string, 1),
                unsubscribeCh: make(chan []string, 1),
 
@@ -268,7 +268,7 @@ func (c *regexConsumer) knownTopics() []string {
        defer c.consumersLock.Unlock()
        topics := make([]string, len(c.consumers))
        n := 0
-       for t, _ := range c.consumers {
+       for t := range c.consumers {
                topics[n] = t
                n++
        }
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index 97c0a55..287878c 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -123,7 +123,7 @@ func TestTopicsDiff(t *testing.T) {
        assert.Equal(t, []string{}, topicsDiff(topics1, topics2))
 }
 
-func runWithClientNamespace(t *testing.T, fn func(*testing.T, Client, string)) 
func(*testing.T) {
+func runWithClientNamespace(fn func(*testing.T, Client, string)) 
func(*testing.T) {
        return func(t *testing.T) {
                ns := fmt.Sprintf("public/%s", generateRandomName())
                err := createNamespace(ns, anonymousNamespacePolicy())
@@ -141,8 +141,8 @@ func runWithClientNamespace(t *testing.T, fn 
func(*testing.T, Client, string)) f
 }
 
 func TestRegexConsumerDiscover(t *testing.T) {
-       t.Run("PatternAll", runWithClientNamespace(t, 
runRegexConsumerDiscoverPatternAll))
-       t.Run("PatternFoo", runWithClientNamespace(t, 
runRegexConsumerDiscoverPatternFoo))
+       t.Run("PatternAll", 
runWithClientNamespace(runRegexConsumerDiscoverPatternAll))
+       t.Run("PatternFoo", 
runWithClientNamespace(runRegexConsumerDiscoverPatternFoo))
 }
 
 func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace 
string) {
@@ -241,6 +241,7 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c 
Client, namespace string
 
        // delete the topic
        err = deleteTopic(fooTopic)
+       assert.Nil(t, err)
 
        rc.discover()
        time.Sleep(300 * time.Millisecond)
@@ -250,8 +251,8 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c 
Client, namespace string
 }
 
 func TestRegexConsumer(t *testing.T) {
-       t.Run("MatchOneTopic", runWithClientNamespace(t, 
runRegexConsumerMatchOneTopic))
-       t.Run("AddTopic", runWithClientNamespace(t, 
runRegexConsumerAddMatchingTopic))
+       t.Run("MatchOneTopic", 
runWithClientNamespace(runRegexConsumerMatchOneTopic))
+       t.Run("AddTopic", 
runWithClientNamespace(runRegexConsumerAddMatchingTopic))
 }
 
 func runRegexConsumerMatchOneTopic(t *testing.T, c Client, namespace string) {
diff --git a/pulsar/error.go b/pulsar/error.go
index bd7e037..b98ce88 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -38,9 +38,11 @@ const (
        // ResultConnectError means failed to connect to broker
        ResultConnectError
 
-       //ReadError                      Result = 6  // Failed to read from 
socket
-       //AuthenticationError            Result = 7  // Authentication failed 
on broker
-       //AuthorizationError             Result = 8  // Client is not 
authorized to create producer/consumer
+       // ReadError means failed to read from socket
+       //ReadError                      Result = 6
+       // AuthenticationError means authentication failed on broker
+       //AuthenticationError            Result = 7
+       //AuthorizationError             Result = 8
        //ErrorGettingAuthenticationData Result = 9  // Client cannot find 
authorization data
        //BrokerMetadataError            Result = 10 // Broker failed in 
updating metadata
        //BrokerPersistenceError         Result = 11 // Broker failed to 
persist entry
@@ -53,8 +55,10 @@ const (
        //ConsumerNotInitialized         Result = 17 // Consumer is not 
initialized
        //ProducerNotInitialized         Result = 18 // Producer is not 
initialized
        //TooManyLookupRequestException  Result = 19 // Too Many concurrent 
LookupRequest
-       //InvalidUrl                            Result = 21 // Client 
Initialized with Invalid Broker Url (VIP Url passed to Client Constructor)
-       //ServiceUnitNotReady                   Result = 22 // Service Unit 
unloaded between client did lookup and producer/consumer got created
+       // InvalidUrl means Client Initialized with Invalid Broker Url (VIP Url 
passed to Client Constructor)
+       //InvalidUrl                            Result = 21
+       // ServiceUnitNotReady unloaded between client did lookup and 
producer/consumer got created
+       //ServiceUnitNotReady                   Result = 22
        //OperationNotSupported                 Result = 23
        //ProducerBlockedQuotaExceededError     Result = 24 // Producer is 
blocked
        //ProducerBlockedQuotaExceededException Result = 25 // Producer is 
getting exception
@@ -63,7 +67,8 @@ const (
        TopicNotFound        Result = 28 // Topic not found
        SubscriptionNotFound Result = 29 // Subscription not found
        //ConsumerNotFound                      Result = 30 // Consumer not 
found
-       //UnsupportedVersionError               Result = 31 // Error when an 
older client/version doesn't support a required feature
+       // UnsupportedVersionError when an older client/version doesn't support 
a required feature
+       //UnsupportedVersionError               Result = 31
        //TopicTerminated                       Result = 32 // Topic was 
already terminated
        //CryptoError                           Result = 33 // Error when 
crypto operation fails
 )
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index cbd5c5d..ed2505a 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -114,12 +114,6 @@ func timeFromUnixTimestampMillis(timestamp uint64) 
time.Time {
        return time.Unix(seconds, nanos)
 }
 
-func timeToUnixTimestampMillis(t time.Time) uint64 {
-       nanos := t.UnixNano()
-       millis := nanos / int64(time.Millisecond)
-       return uint64(millis)
-}
-
 type message struct {
        publishTime time.Time
        eventTime   time.Time
diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go
index e4ba3c1..54c3bee 100644
--- a/pulsar/impl_message_test.go
+++ b/pulsar/impl_message_test.go
@@ -75,8 +75,8 @@ func TestAckTracker(t *testing.T) {
 
 func TestAckingMessageIDBatchOne(t *testing.T) {
        tracker := newAckTracker(1)
-       msgId := newTrackingMessageID(1, 1, 0, 0, tracker)
-       assert.Equal(t, true, msgId.ack())
+       msgID := newTrackingMessageID(1, 1, 0, 0, tracker)
+       assert.Equal(t, true, msgID.ack())
        assert.Equal(t, true, tracker.completed())
 }
 
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 4b19438..33a1655 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -104,7 +104,7 @@ func (bb *BatchBuilder) hasSpace(payload []byte) bool {
 }
 
 // Add will add single message to batch.
-func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceID 
uint64, payload []byte,
+func (bb *BatchBuilder) Add(metadata proto.Message, sequenceID uint64, payload 
[]byte,
        callback interface{}, replicateTo []string) bool {
        if replicateTo != nil && bb.numMessages != 0 {
                // If the current batch is not empty and we're trying to set 
the replication clusters,
diff --git a/pulsar/internal/blocking_queue_test.go 
b/pulsar/internal/blocking_queue_test.go
index bf21da5..12bb8fc 100644
--- a/pulsar/internal/blocking_queue_test.go
+++ b/pulsar/internal/blocking_queue_test.go
@@ -55,7 +55,7 @@ func TestBlockingQueue(t *testing.T) {
        time.Sleep(100 * time.Millisecond)
 
        select {
-       case _ = <-ch:
+       case <-ch:
                assert.Fail(t, "Should not have had a value at this point")
        default:
                // Good, no value yet
@@ -100,7 +100,7 @@ func TestBlockingQueueWaitWhenFull(t *testing.T) {
        time.Sleep(100 * time.Millisecond)
 
        select {
-       case _ = <-ch:
+       case <-ch:
                assert.Fail(t, "Should not have had a value at this point")
        default:
                // Good, no value yet
@@ -109,7 +109,7 @@ func TestBlockingQueueWaitWhenFull(t *testing.T) {
        assert.Equal(t, "test-1", q.Poll())
 
        // Now the go-routine should have completed
-       _ = <-ch
+       <-ch
        assert.Equal(t, 3, q.Size())
 
        assert.Equal(t, "test-2", q.Take())
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 874d4cd..59a3981 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -60,7 +60,8 @@ func NewMessageReaderFromArray(headersAndPayload []byte) 
*MessageReader {
 // [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
 //
 // Batch format
-// [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] 
[METADATA_SIZE][METADATA][PAYLOAD] [METADATA_SIZE][METADATA][PAYLOAD]
+// [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] 
[METADATA_SIZE][METADATA][PAYLOAD]
+// [METADATA_SIZE][METADATA][PAYLOAD]
 //
 type MessageReader struct {
        buffer Buffer
@@ -191,7 +192,7 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg 
proto.Message) *pb.BaseCommand
        return cmd
 }
 
-func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload 
[]byte) {
+func addSingleMessageToBatch(wb Buffer, smm proto.Message, payload []byte) {
        serialized, err := proto.Marshal(smm)
        if err != nil {
                log.WithError(err).Fatal("Protobuf serialization error")
@@ -202,7 +203,7 @@ func addSingleMessageToBatch(wb Buffer, smm 
*pb.SingleMessageMetadata, payload [
        wb.Write(payload)
 }
 
-func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata 
*pb.MessageMetadata, payload []byte) {
+func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata 
proto.Message, payload []byte) {
        // Wire format
        // [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] 
[METADATA_SIZE][METADATA] [PAYLOAD]
        cmdSize := proto.Size(cmdSend)
diff --git a/pulsar/internal/compression/zstd.go 
b/pulsar/internal/compression/zstd.go
index b385c34..41fba0b 100644
--- a/pulsar/internal/compression/zstd.go
+++ b/pulsar/internal/compression/zstd.go
@@ -19,6 +19,7 @@ package compression
 
 import (
        "bytes"
+
        "github.com/klauspost/compress/zstd"
        "github.com/pkg/errors"
 )
@@ -41,7 +42,7 @@ func (p *zstdProvider) Compress(data []byte) []byte {
        return p.encoder.EncodeAll(data, []byte{})
 }
 
-func (p* zstdProvider) Decompress(compressedData []byte, originalSize int) 
([]byte, error) {
+func (p *zstdProvider) Decompress(compressedData []byte, originalSize int) 
([]byte, error) {
        d, err := zstd.NewReader(bytes.NewReader(compressedData))
        if err != nil {
                return nil, err
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index a42be14..7a7cd5f 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -299,7 +299,7 @@ func (c *connection) run() {
 
        for {
                select {
-               case <- c.closeCh:
+               case <-c.closeCh:
                        c.Close()
                        return
 
@@ -318,7 +318,7 @@ func (c *connection) run() {
                        }
                        c.internalWriteData(data)
 
-               case _ = <-c.pingTicker.C:
+               case <-c.pingTicker.C:
                        c.sendPing()
                }
        }
@@ -327,9 +327,9 @@ func (c *connection) run() {
 func (c *connection) runPingCheck() {
        for {
                select {
-               case <- c.closeCh:
+               case <-c.closeCh:
                        return
-               case _ = <-c.pingCheckTicker.C:
+               case <-c.pingCheckTicker.C:
                        if c.lastDataReceived().Add(2 * 
keepAliveInterval).Before(time.Now()) {
                                // We have not received a response to the 
previous Ping request, the
                                // connection to broker is stale
@@ -442,7 +442,8 @@ func (c *connection) Write(data []byte) {
        c.writeRequestsCh <- data
 }
 
-func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand, 
callback func(command *pb.BaseCommand, err error)) {
+func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
+       callback func(command *pb.BaseCommand, err error)) {
        c.incomingRequestsCh <- &request{
                id:       &requestID,
                cmd:      req,
@@ -487,8 +488,8 @@ func (c *connection) handleResponseError(serverError 
*pb.CommandError) {
 
        delete(c.pendingReqs, requestID)
 
-       request.callback(nil,
-               errors.New(fmt.Sprintf("server error: %s: %s", 
serverError.GetError(), serverError.GetMessage())))
+       errMsg := fmt.Sprintf("server error: %s: %s", serverError.GetError(), 
serverError.GetMessage())
+       request.callback(nil, errors.New(errMsg))
 }
 
 func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
@@ -583,8 +584,8 @@ func (c *connection) TriggerClose() {
        }
 
        select {
-               case <- c.closeCh:
-                       return
+       case <-c.closeCh:
+               return
        default:
                close(c.closeCh)
        }
@@ -693,7 +694,6 @@ func (c *connection) consumerHandler(id uint64) 
(ConsumerHandler, bool) {
        return h, ok
 }
 
-func (c *connection) ID() (string) {
+func (c *connection) ID() string {
        return fmt.Sprintf("%s -> %s", c.cnx.LocalAddr(), c.cnx.RemoteAddr())
 }
-
diff --git a/pulsar/internal/default_router.go 
b/pulsar/internal/default_router.go
index 6626351..e4d31ca 100644
--- a/pulsar/internal/default_router.go
+++ b/pulsar/internal/default_router.go
@@ -40,7 +40,8 @@ func NewSystemClock() Clock {
 
 // NewDefaultRouter set the message routing mode for the partitioned producer.
 // Default routing mode is round-robin routing.
-func NewDefaultRouter(clock Clock, hashFunc func(string) uint32, 
maxBatchingDelay time.Duration) func(string, uint32) int {
+func NewDefaultRouter(clock Clock, hashFunc func(string) uint32,
+       maxBatchingDelay time.Duration) func(string, uint32) int {
        state := &defaultRouter{
                clock:            clock,
                shiftIdx:         rand.Uint32(),
diff --git a/pulsar/internal/lookup_service.go 
b/pulsar/internal/lookup_service.go
index 0b54c07..ee6dffe 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -54,7 +54,8 @@ func NewLookupService(rpcClient RPCClient, serviceURL 
*url.URL) LookupService {
        }
 }
 
-func (ls *lookupService) getBrokerAddress(lr *pb.CommandLookupTopicResponse) 
(logicalAddress *url.URL, physicalAddress *url.URL, err error) {
+func (ls *lookupService) getBrokerAddress(lr *pb.CommandLookupTopicResponse) 
(logicalAddress *url.URL,
+       physicalAddress *url.URL, err error) {
        logicalAddress, err = url.ParseRequestURI(lr.GetBrokerServiceUrl())
        if err != nil {
                return nil, nil, err
diff --git a/pulsar/internal/lookup_service_test.go 
b/pulsar/internal/lookup_service_test.go
index 9a9f053..5bb1724 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -49,7 +49,8 @@ func (c *mockedRPCClient) NewConsumerID() uint64 {
        return 1
 }
 
-func (c *mockedRPCClient) RequestToAnyBroker(requestID uint64, cmdType 
pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
+func (c *mockedRPCClient) RequestToAnyBroker(requestID uint64, cmdType 
pb.BaseCommand_Type,
+       message proto.Message) (*RPCResult, error) {
        assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP)
 
        expectedRequest := &c.expectedRequests[0]
@@ -90,7 +91,8 @@ func (c *mockedRPCClient) Request(logicalAddr *url.URL, 
physicalAddr *url.URL, r
        }, nil
 }
 
-func (c *mockedRPCClient) RequestOnCnx(cnx Connection, requestID uint64, 
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
+func (c *mockedRPCClient) RequestOnCnx(cnx Connection, requestID uint64, 
cmdType pb.BaseCommand_Type,
+       message proto.Message) (*RPCResult, error) {
        assert.Fail(c.t, "Shouldn't be called")
        return nil, nil
 }
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index aa89b59..23c3b61 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -69,7 +69,8 @@ func NewRPCClient(serviceURL *url.URL, pool ConnectionPool, 
requestTimeout time.
        }
 }
 
-func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType 
pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
+func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType 
pb.BaseCommand_Type,
+       message proto.Message) (*RPCResult, error) {
        return c.Request(c.serviceURL, c.serviceURL, requestID, cmdType, 
message)
 }
 
@@ -81,7 +82,10 @@ func (c *rpcClient) Request(logicalAddr *url.URL, 
physicalAddr *url.URL, request
                return nil, err
        }
 
-       type Res struct {*RPCResult; error}
+       type Res struct {
+               *RPCResult
+               error
+       }
        ch := make(chan Res)
 
        // TODO: Handle errors with disconnections
@@ -110,7 +114,7 @@ func (c *rpcClient) RequestOnCnx(cnx Connection, requestID 
uint64, cmdType pb.Ba
                Cnx: cnx,
        }
 
-       var rpcErr error = nil
+       var rpcErr error
        cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response 
*pb.BaseCommand, err error) {
                rpcResult.Response = response
                rpcErr = err
diff --git a/pulsar/message.go b/pulsar/message.go
index dd4fcff..cbb3d63 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -59,7 +59,8 @@ type Message interface {
        // The message id can be used to univocally refer to a message without 
having the keep the entire payload in memory.
        ID() MessageID
 
-       // PublishTime get the publish time of this message. The publish time 
is the timestamp that a client publish the message.
+       // PublishTime get the publish time of this message. The publish time 
is the timestamp that a client
+       // publish the message.
        PublishTime() time.Time
 
        // EventTime get the event time associated with this message. It is 
typically set by the applications via
diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go
index e8a79e8..f887844 100644
--- a/pulsar/negative_acks_tracker.go
+++ b/pulsar/negative_acks_tracker.go
@@ -18,9 +18,10 @@
 package pulsar
 
 import (
-       log "github.com/sirupsen/logrus"
        "sync"
        "time"
+
+       log "github.com/sirupsen/logrus"
 )
 
 type redeliveryConsumer interface {
@@ -53,7 +54,7 @@ func newNegativeAcksTracker(rc redeliveryConsumer, delay 
time.Duration) *negativ
 func (t *negativeAcksTracker) Add(msgID *messageID) {
        // Always clear up the batch index since we want to track the nack
        // for the entire batch
-       batchMsgId := messageID{
+       batchMsgID := messageID{
                ledgerID: msgID.ledgerID,
                entryID:  msgID.entryID,
                batchIdx: 0,
@@ -62,14 +63,14 @@ func (t *negativeAcksTracker) Add(msgID *messageID) {
        t.Lock()
        defer t.Unlock()
 
-       _, present := t.negativeAcks[batchMsgId]
+       _, present := t.negativeAcks[batchMsgID]
        if present {
                // The batch is already being tracked
                return
-       } else {
-               targetTime := time.Now().Add(t.delay)
-               t.negativeAcks[batchMsgId] = targetTime
        }
+
+       targetTime := time.Now().Add(t.delay)
+       t.negativeAcks[batchMsgID] = targetTime
 }
 
 func (t *negativeAcksTracker) track() {
diff --git a/pulsar/negative_acks_tracker_test.go 
b/pulsar/negative_acks_tracker_test.go
index 93c1af2..f7a1c50 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -32,7 +32,6 @@ type nackMockedConsumer struct {
        ch     chan messageID
        closed bool
        lock   sync.Mutex
-       msgIds []messageID
 }
 
 func newNackMockedConsumer() *nackMockedConsumer {
diff --git a/pulsar/producer.go b/pulsar/producer.go
index 6a68e7b..46103b5 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -63,7 +63,8 @@ type ProducerOptions struct {
        // This properties will be visible in the topic stats
        Properties map[string]string
 
-       // MaxPendingMessages set the max size of the queue holding the 
messages pending to receive an acknowledgment from the broker.
+       // MaxPendingMessages set the max size of the queue holding the 
messages pending to receive an
+       // acknowledgment from the broker.
        MaxPendingMessages int
 
        // HashingScheme change the `HashingScheme` used to chose the partition 
on where to publish a particular message.
@@ -101,12 +102,14 @@ type ProducerOptions struct {
        // Setting `DisableBatching: true` will make the producer to send 
messages individually
        DisableBatching bool
 
-       // BatchingMaxPublishDelay set the time period within which the 
messages sent will be batched (default: 10ms) if batch messages are
-       // enabled. If set to a non zero value, messages will be queued until 
this time interval or until
+       // BatchingMaxPublishDelay set the time period within which the 
messages sent will be batched (default: 10ms)
+       // if batch messages are enabled. If set to a non zero value, messages 
will be queued until this time
+       // interval or until
        BatchingMaxPublishDelay time.Duration
 
-       // BatchingMaxMessages set the maximum number of messages permitted in 
a batch. (default: 1000) If set to a value greater than 1,
-       // messages will be queued until this threshold is reached or batch 
interval has elapsed
+       // BatchingMaxMessages set the maximum number of messages permitted in 
a batch. (default: 1000)
+       // If set to a value greater than 1, messages will be queued until this 
threshold is reached or
+       // batch interval has elapsed.
        BatchingMaxMessages uint
 }
 
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index ce97c22..101929d 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -129,7 +129,8 @@ func (p *producer) Send(ctx context.Context, msg 
*ProducerMessage) (MessageID, e
        return p.producers[partition].Send(ctx, msg)
 }
 
-func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage, 
callback func(MessageID, *ProducerMessage, error)) {
+func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage,
+       callback func(MessageID, *ProducerMessage, error)) {
        partition := p.messageRouter(msg, p)
        p.producers[partition].SendAsync(ctx, msg, callback)
 }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 47dff85..f96a706 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -66,8 +66,8 @@ type partitionProducer struct {
 
 const defaultBatchingMaxPublishDelay = 10 * time.Millisecond
 
-func newPartitionProducer(client *client, topic string, options 
*ProducerOptions, partitionIdx int) (*partitionProducer, error) {
-
+func newPartitionProducer(client *client, topic string, options 
*ProducerOptions, partitionIdx int) (
+       *partitionProducer, error) {
        var batchingMaxPublishDelay time.Duration
        if options.BatchingMaxPublishDelay != 0 {
                batchingMaxPublishDelay = options.BatchingMaxPublishDelay
@@ -126,11 +126,11 @@ func (p *partitionProducer) grabCnx() error {
        p.log.Debug("Lookup result: ", lr)
        id := p.client.rpcClient.NewRequestID()
        cmdProducer := &pb.CommandProducer{
-               RequestId:    proto.Uint64(id),
-               Topic:        proto.String(p.topic),
-               Encrypted:    nil,
-               ProducerId:   proto.Uint64(p.producerID),
-               Schema:       nil,
+               RequestId:  proto.Uint64(id),
+               Topic:      proto.String(p.topic),
+               Encrypted:  nil,
+               ProducerId: proto.Uint64(p.producerID),
+               Schema:     nil,
        }
 
        if p.producerName != "" {
@@ -218,7 +218,7 @@ func (p *partitionProducer) runEventsLoop() {
                                return
                        }
 
-               case _ = <-p.batchFlushTicker.C:
+               case <-p.batchFlushTicker.C:
                        p.internalFlushCurrentBatch()
                }
        }
@@ -341,11 +341,11 @@ func (p *partitionProducer) Send(ctx context.Context, msg 
*ProducerMessage) (Mes
        wg.Add(1)
 
        var err error
-       var msgId MessageID
+       var msgID MessageID
 
        p.internalSendAsync(ctx, msg, func(ID MessageID, message 
*ProducerMessage, e error) {
                err = e
-               msgId = ID
+               msgID = ID
                wg.Done()
        }, true)
 
@@ -356,7 +356,7 @@ func (p *partitionProducer) Send(ctx context.Context, msg 
*ProducerMessage) (Mes
        }
 
        wg.Wait()
-       return msgId, err
+       return msgID, err
 }
 
 func (p *partitionProducer) SendAsync(ctx context.Context, msg 
*ProducerMessage,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 189902c..a387d3d 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -20,12 +20,12 @@ package pulsar
 import (
        "context"
        "fmt"
-       "github.com/apache/pulsar-client-go/pulsar/internal"
        "net/http"
        "sync"
        "testing"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/internal"
        "github.com/stretchr/testify/assert"
 
        log "github.com/sirupsen/logrus"
@@ -384,6 +384,7 @@ func TestFlushInPartitionedProducer(t *testing.T) {
                BatchingMaxMessages:     uint(numOfMessages / 
numberOfPartitions),
                BatchingMaxPublishDelay: time.Second * 10,
        })
+       assert.Nil(t, err)
        defer producer.Close()
 
        // send 5 messages
diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go
index d8cdc6d..3471bca 100644
--- a/pulsar/test_helper.go
+++ b/pulsar/test_helper.go
@@ -21,7 +21,6 @@ import (
        "bytes"
        "encoding/json"
        "fmt"
-       "github.com/apache/pulsar-client-go/pulsar/internal"
        "io"
        "io/ioutil"
        "net/http"
@@ -30,6 +29,8 @@ import (
        "testing"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/internal"
+
        pkgerrors "github.com/pkg/errors"
 )
 
@@ -141,10 +142,6 @@ func createNamespace(namespace string, policy 
map[string]interface{}) error {
        return httpPut("admin/v2/namespaces/"+namespace, policy)
 }
 
-func deleteNamespace(namespace string) error {
-       return httpDelete("admin/v2/namespaces/" + namespace)
-}
-
 func createTopic(topic string) error {
        return httpPut("admin/v2/persistent/"+topic, nil)
 }
diff --git a/run-ci.sh b/run-ci.sh
index 23896f4..2de0eee 100755
--- a/run-ci.sh
+++ b/run-ci.sh
@@ -31,7 +31,7 @@ go build -o pulsar-perf ./perf
 
 ./pulsar-test-service-start.sh
 
-go test -race -coverprofile=/tmp/coverage -timeout=1h ./...
+go test -race -coverprofile=/tmp/coverage -timeout=20m ./...
 go tool cover -html=/tmp/coverage -o coverage.html
 
 ./pulsar-test-service-stop.sh

Reply via email to