This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 5004b5d Enable golangci lint for project (#138)
5004b5d is described below
commit 5004b5d81001f7276091b11699cf568778d83556
Author: 冉小龙 <[email protected]>
AuthorDate: Fri Dec 27 10:51:34 2019 +0800
Enable golangci lint for project (#138)
### Modifications
Enable golangci lint for project, checks as follows:
```
- bodyclose
- deadcode
- gocritic
- goimports
- golint
- gosimple
- govet
- ineffassign
- interfacer
- misspell
- staticcheck
- structcheck
- stylecheck
- typecheck
- unconvert
- unparam
- unused
- varcheck
- lll
- prealloc
- maligned
```
---
.gitignore | 2 ++
.golangci.yml | 44 ++++++++++++++++------------------
docker-ci.sh | 5 +++-
perf/perf-producer.go | 3 ++-
perf/pulsar-perf-go.go | 2 +-
pulsar/client_impl.go | 12 +++-------
pulsar/client_impl_test.go | 14 ++++++-----
pulsar/consumer.go | 4 ++--
pulsar/consumer_impl.go | 3 +--
pulsar/consumer_multitopic_test.go | 23 +++++++++++-------
pulsar/consumer_partition.go | 25 +++++++++----------
pulsar/consumer_regex.go | 4 ++--
pulsar/consumer_regex_test.go | 11 +++++----
pulsar/error.go | 17 ++++++++-----
pulsar/impl_message.go | 6 -----
pulsar/impl_message_test.go | 4 ++--
pulsar/internal/batch_builder.go | 2 +-
pulsar/internal/blocking_queue_test.go | 6 ++---
pulsar/internal/commands.go | 7 +++---
pulsar/internal/compression/zstd.go | 3 ++-
pulsar/internal/connection.go | 22 ++++++++---------
pulsar/internal/default_router.go | 3 ++-
pulsar/internal/lookup_service.go | 3 ++-
pulsar/internal/lookup_service_test.go | 6 +++--
pulsar/internal/rpc_client.go | 10 +++++---
pulsar/message.go | 3 ++-
pulsar/negative_acks_tracker.go | 13 +++++-----
pulsar/negative_acks_tracker_test.go | 1 -
pulsar/producer.go | 13 ++++++----
pulsar/producer_impl.go | 3 ++-
pulsar/producer_partition.go | 22 ++++++++---------
pulsar/producer_test.go | 3 ++-
pulsar/test_helper.go | 7 ++----
run-ci.sh | 2 +-
34 files changed, 163 insertions(+), 145 deletions(-)
diff --git a/.gitignore b/.gitignore
index 7daaebc..bc14ef8 100644
--- a/.gitignore
+++ b/.gitignore
@@ -11,3 +11,5 @@
*.out
perf/perf
+pulsar-perf
+bin
diff --git a/.golangci.yml b/.golangci.yml
index 272cc9a..18560b2 100644
--- a/.golangci.yml
+++ b/.golangci.yml
@@ -2,29 +2,27 @@
# https://github.com/golangci/golangci-lint#install
# We can execute `golangci-lint run` for code checking.
run:
- tests: true
- skip-dirs:
- - _examples
-
-output:
- print-issued-lines: false
+ deadline: 6m
linters:
- enable-all: true
- disable:
- - maligned
- - megacheck
+ disable-all: true
+ enable:
+ - bodyclose
+ - deadcode
+ - goimports
+ - golint
+ - gosimple
+ - govet
+ - ineffassign
+ - interfacer
+ - misspell
+ - staticcheck
+ - structcheck
+ - stylecheck
+ - typecheck
+ - unconvert
+ - unparam
+ - varcheck
- lll
- - gocyclo
- - gochecknoglobals
-
-linters-settings:
- govet:
- check-shadowing: true
- gocyclo:
- min-complexity: 10
- dupl:
- threshold: 100
- goconst:
- min-len: 8
- min-occurrences: 3
+ - prealloc
+ - maligned
diff --git a/docker-ci.sh b/docker-ci.sh
index 81486cf..f6924db 100755
--- a/docker-ci.sh
+++ b/docker-ci.sh
@@ -20,6 +20,9 @@
set -e -x
+# Project code style check
+docker run --rm -v $(pwd):/app -w /app golangci/golangci-lint:v1.21.0
golangci-lint run -v -c ./.golangci.yml ./...
+
SRC_DIR=$(git rev-parse --show-toplevel)
cd ${SRC_DIR}
@@ -28,4 +31,4 @@ IMAGE_NAME=pulsar-client-go-test:latest
docker build -t ${IMAGE_NAME} .
docker run -i -v ${PWD}:/pulsar-client-go ${IMAGE_NAME} \
- bash -c "cd /pulsar-client-go && ./run-ci.sh"
\ No newline at end of file
+ bash -c "cd /pulsar-client-go && ./run-ci.sh"
diff --git a/perf/perf-producer.go b/perf/perf-producer.go
index 035285b..5660a80 100644
--- a/perf/perf-producer.go
+++ b/perf/perf-producer.go
@@ -141,7 +141,8 @@ func produce(produceArgs *ProduceArgs, stop <-chan
struct{}) {
return
case <-tick.C:
messageRate := float64(messagesPublished) / float64(10)
- log.Infof(`Stats - Publish rate: %6.1f msg/s - %6.1f
Mbps - Latency ms: 50%% %5.1f - 95%% %5.1f - 99%% %5.1f - 99.9%% %5.1f - max
%6.1f`,
+ log.Infof(`Stats - Publish rate: %6.1f msg/s - %6.1f
Mbps -
+ Latency ms: 50%% %5.1f -95%% %5.1f - 99%% %5.1f
- 99.9%% %5.1f - max %6.1f`,
messageRate,
messageRate*float64(produceArgs.MessageSize)/1024/1024*8,
q.Query(0.5)*1000,
diff --git a/perf/pulsar-perf-go.go b/perf/pulsar-perf-go.go
index f75e5d0..3ceff71 100644
--- a/perf/pulsar-perf-go.go
+++ b/perf/pulsar-perf-go.go
@@ -32,7 +32,7 @@ import (
"github.com/apache/pulsar-client-go/pulsar"
)
-// global flags
+// FlagProfile is a global flag
var FlagProfile bool
var flagDebug bool
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 7873b56..04d2d2b 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -33,21 +33,15 @@ import (
)
const (
- defaultConnectionTimeout = 30*time.Second
- defaultOperationTimeout = 30*time.Second
+ defaultConnectionTimeout = 30 * time.Second
+ defaultOperationTimeout = 30 * time.Second
)
type client struct {
- options ClientOptions
-
cnxPool internal.ConnectionPool
rpcClient internal.RPCClient
+ handlers internal.ClientHandlers
lookupService internal.LookupService
- auth auth.Provider
-
- handlers internal.ClientHandlers
- producerIDGenerator uint64
- consumerIDGenerator uint64
}
func newClient(options ClientOptions) (Client, error) {
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index 841069a..8917c46 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -19,9 +19,10 @@ package pulsar
import (
"fmt"
- "github.com/stretchr/testify/assert"
"io/ioutil"
"testing"
+
+ "github.com/stretchr/testify/assert"
)
func TestClient(t *testing.T) {
@@ -176,7 +177,7 @@ func TestTokenAuth(t *testing.T) {
func TestTokenAuthWithSupplier(t *testing.T) {
client, err := NewClient(ClientOptions{
- URL: serviceURL,
+ URL: serviceURL,
Authentication: NewAuthenticationTokenFromSupplier(func() (s
string, err error) {
token, err := ioutil.ReadFile(tokenFilePath)
if err != nil {
@@ -224,8 +225,8 @@ func TestTopicPartitions(t *testing.T) {
defer client.Close()
// Create topic with 5 partitions
-
httpPut("admin/v2/persistent/public/default/TestGetTopicPartitions/partitions",
- 5)
+ err =
httpPut("admin/v2/persistent/public/default/TestGetTopicPartitions/partitions",
5)
+ assert.Nil(t, err)
partitionedTopic := "persistent://public/default/TestGetTopicPartitions"
@@ -260,14 +261,15 @@ func TestNamespaceTopicsNamespaceDoesNotExit(t
*testing.T) {
// fetch from namespace that does not exist
name := generateRandomName()
topics, err := ci.namespaceTopics(fmt.Sprintf("%s/%s", name, name))
+ assert.Nil(t, err)
assert.Equal(t, 0, len(topics))
}
func TestNamespaceTopics(t *testing.T) {
name := generateRandomName()
namespace := fmt.Sprintf("public/%s", name)
- namespaceUrl := fmt.Sprintf("admin/v2/namespaces/%s", namespace)
- err := httpPut(namespaceUrl, anonymousNamespacePolicy())
+ namespaceURL := fmt.Sprintf("admin/v2/namespaces/%s", namespace)
+ err := httpPut(namespaceURL, anonymousNamespacePolicy())
if err != nil {
t.Fatal()
}
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 591cab5..a98eb2d 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -45,8 +45,8 @@ const (
// If that consumer disconnects, one of the other connected consumers
will start receiving messages.
Failover
- // KeyShared subscription mode, multiple consumer will be able to use
the same subscription and all messages with the same key
- // will be dispatched to only one consumer
+ // KeyShared subscription mode, multiple consumer will be able to use
the same
+ // subscription and all messages with the same key will be dispatched
to only one consumer
KeyShared
)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 4a1f85e..c962e52 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -41,8 +41,7 @@ type acker interface {
}
type consumer struct {
- options ConsumerOptions
-
+ options ConsumerOptions
consumers []*partitionConsumer
// channel used to deliver message to clients
diff --git a/pulsar/consumer_multitopic_test.go
b/pulsar/consumer_multitopic_test.go
index 6ac9da0..47704f9 100644
--- a/pulsar/consumer_multitopic_test.go
+++ b/pulsar/consumer_multitopic_test.go
@@ -54,7 +54,7 @@ func TestMultiTopicConsumerReceive(t *testing.T) {
if err != nil {
t.Fatal(err)
}
- err = genMessages(p, 5, func(idx int) string {
+ err = genMessages(p, 10, func(idx int) string {
return fmt.Sprintf("topic-%d-hello-%d", i+1, idx)
})
p.Close()
@@ -65,16 +65,21 @@ func TestMultiTopicConsumerReceive(t *testing.T) {
receivedTopic1 := 0
receivedTopic2 := 0
- for receivedTopic1+receivedTopic2 < 10 {
+ // nolint
+ for receivedTopic1+receivedTopic2 < 20 {
select {
- case cm := <-consumer.Chan():
- msg := string(cm.Payload())
- if strings.HasPrefix(msg, "topic-1") {
- receivedTopic1++
- } else if strings.HasPrefix(msg, "topic-2") {
- receivedTopic2++
+ case cm, ok := <-consumer.Chan():
+ if ok {
+ msg := string(cm.Payload())
+ if strings.HasPrefix(msg, "topic-1") {
+ receivedTopic1++
+ } else if strings.HasPrefix(msg, "topic-2") {
+ receivedTopic2++
+ }
+ consumer.Ack(cm.Message)
+ } else {
+ t.Fail()
}
- consumer.Ack(cm.Message)
}
}
assert.Equal(t, receivedTopic1, receivedTopic2)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 81cb9ad..55d1318 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -179,9 +179,9 @@ func (pc *partitionConsumer) internalRedeliver(req
*redeliveryRequest) {
msgIds := req.msgIds
pc.log.Debug("Request redelivery after negative ack for messages",
msgIds)
- msgIdDataList := make([]*pb.MessageIdData, len(msgIds))
+ msgIDDataList := make([]*pb.MessageIdData, len(msgIds))
for i := 0; i < len(msgIds); i++ {
- msgIdDataList[i] = &pb.MessageIdData{
+ msgIDDataList[i] = &pb.MessageIdData{
LedgerId: proto.Uint64(uint64(msgIds[i].ledgerID)),
EntryId: proto.Uint64(uint64(msgIds[i].entryID)),
}
@@ -190,7 +190,7 @@ func (pc *partitionConsumer) internalRedeliver(req
*redeliveryRequest) {
pc.client.rpcClient.RequestOnCnxNoWait(pc.conn,
pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES,
&pb.CommandRedeliverUnacknowledgedMessages{
ConsumerId: proto.Uint64(pc.consumerID),
- MessageIds: msgIdDataList,
+ MessageIds: msgIDDataList,
})
}
@@ -207,12 +207,12 @@ func (pc *partitionConsumer) Close() {
}
func (pc *partitionConsumer) internalAck(req *ackRequest) {
- msgId := req.msgID
+ msgID := req.msgID
messageIDs := make([]*pb.MessageIdData, 1)
messageIDs[0] = &pb.MessageIdData{
- LedgerId: proto.Uint64(uint64(msgId.ledgerID)),
- EntryId: proto.Uint64(uint64(msgId.entryID)),
+ LedgerId: proto.Uint64(uint64(msgID.ledgerID)),
+ EntryId: proto.Uint64(uint64(msgID.entryID)),
}
cmdAck := &pb.CommandAck{
@@ -565,7 +565,7 @@ func (pc *partitionConsumer) grabConn() error {
func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload
internal.Buffer) (internal.Buffer, error) {
provider, ok := compressionProviders[msgMeta.GetCompression()]
if !ok {
- err := fmt.Errorf("Unsupported compression type: %v",
msgMeta.GetCompression())
+ err := fmt.Errorf("unsupported compression type: %v",
msgMeta.GetCompression())
pc.log.WithError(err).Error("Failed to decompress message.")
return nil, err
}
@@ -573,21 +573,22 @@ func (pc *partitionConsumer) Decompress(msgMeta
*pb.MessageMetadata, payload int
uncompressed, err := provider.Decompress(payload.ReadableSlice(),
int(msgMeta.GetUncompressedSize()))
if err != nil {
return nil, err
- } else {
- return internal.NewBufferWrapper(uncompressed), nil
}
+
+ return internal.NewBufferWrapper(uncompressed), nil
}
-func (pc *partitionConsumer) discardCorruptedMessage(msgId *pb.MessageIdData,
validationError pb.CommandAck_ValidationError) {
+func (pc *partitionConsumer) discardCorruptedMessage(msgID *pb.MessageIdData,
+ validationError pb.CommandAck_ValidationError) {
pc.log.WithFields(log.Fields{
- "msgId": msgId,
+ "msgId": msgID,
"validationError": validationError,
}).Error("Discarding corrupted message")
pc.client.rpcClient.RequestOnCnxNoWait(pc.conn,
pb.BaseCommand_ACK, &pb.CommandAck{
ConsumerId: proto.Uint64(pc.consumerID),
- MessageId: []*pb.MessageIdData{msgId},
+ MessageId: []*pb.MessageIdData{msgID},
AckType: pb.CommandAck_Individual.Enum(),
ValidationError: validationError.Enum(),
})
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index e68c495..1d2f157 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -69,7 +69,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn
*internal.TopicName, p
namespace: tn.Namespace,
pattern: pattern,
- consumers: make(map[string]Consumer, 0),
+ consumers: make(map[string]Consumer),
subscribeCh: make(chan []string, 1),
unsubscribeCh: make(chan []string, 1),
@@ -268,7 +268,7 @@ func (c *regexConsumer) knownTopics() []string {
defer c.consumersLock.Unlock()
topics := make([]string, len(c.consumers))
n := 0
- for t, _ := range c.consumers {
+ for t := range c.consumers {
topics[n] = t
n++
}
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index 97c0a55..287878c 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -123,7 +123,7 @@ func TestTopicsDiff(t *testing.T) {
assert.Equal(t, []string{}, topicsDiff(topics1, topics2))
}
-func runWithClientNamespace(t *testing.T, fn func(*testing.T, Client, string))
func(*testing.T) {
+func runWithClientNamespace(fn func(*testing.T, Client, string))
func(*testing.T) {
return func(t *testing.T) {
ns := fmt.Sprintf("public/%s", generateRandomName())
err := createNamespace(ns, anonymousNamespacePolicy())
@@ -141,8 +141,8 @@ func runWithClientNamespace(t *testing.T, fn
func(*testing.T, Client, string)) f
}
func TestRegexConsumerDiscover(t *testing.T) {
- t.Run("PatternAll", runWithClientNamespace(t,
runRegexConsumerDiscoverPatternAll))
- t.Run("PatternFoo", runWithClientNamespace(t,
runRegexConsumerDiscoverPatternFoo))
+ t.Run("PatternAll",
runWithClientNamespace(runRegexConsumerDiscoverPatternAll))
+ t.Run("PatternFoo",
runWithClientNamespace(runRegexConsumerDiscoverPatternFoo))
}
func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace
string) {
@@ -241,6 +241,7 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c
Client, namespace string
// delete the topic
err = deleteTopic(fooTopic)
+ assert.Nil(t, err)
rc.discover()
time.Sleep(300 * time.Millisecond)
@@ -250,8 +251,8 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c
Client, namespace string
}
func TestRegexConsumer(t *testing.T) {
- t.Run("MatchOneTopic", runWithClientNamespace(t,
runRegexConsumerMatchOneTopic))
- t.Run("AddTopic", runWithClientNamespace(t,
runRegexConsumerAddMatchingTopic))
+ t.Run("MatchOneTopic",
runWithClientNamespace(runRegexConsumerMatchOneTopic))
+ t.Run("AddTopic",
runWithClientNamespace(runRegexConsumerAddMatchingTopic))
}
func runRegexConsumerMatchOneTopic(t *testing.T, c Client, namespace string) {
diff --git a/pulsar/error.go b/pulsar/error.go
index bd7e037..b98ce88 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -38,9 +38,11 @@ const (
// ResultConnectError means failed to connect to broker
ResultConnectError
- //ReadError Result = 6 // Failed to read from
socket
- //AuthenticationError Result = 7 // Authentication failed
on broker
- //AuthorizationError Result = 8 // Client is not
authorized to create producer/consumer
+ // ReadError means failed to read from socket
+ //ReadError Result = 6
+ // AuthenticationError means authentication failed on broker
+ //AuthenticationError Result = 7
+ //AuthorizationError Result = 8
//ErrorGettingAuthenticationData Result = 9 // Client cannot find
authorization data
//BrokerMetadataError Result = 10 // Broker failed in
updating metadata
//BrokerPersistenceError Result = 11 // Broker failed to
persist entry
@@ -53,8 +55,10 @@ const (
//ConsumerNotInitialized Result = 17 // Consumer is not
initialized
//ProducerNotInitialized Result = 18 // Producer is not
initialized
//TooManyLookupRequestException Result = 19 // Too Many concurrent
LookupRequest
- //InvalidUrl Result = 21 // Client
Initialized with Invalid Broker Url (VIP Url passed to Client Constructor)
- //ServiceUnitNotReady Result = 22 // Service Unit
unloaded between client did lookup and producer/consumer got created
+ // InvalidUrl means Client Initialized with Invalid Broker Url (VIP Url
passed to Client Constructor)
+ //InvalidUrl Result = 21
+ // ServiceUnitNotReady unloaded between client did lookup and
producer/consumer got created
+ //ServiceUnitNotReady Result = 22
//OperationNotSupported Result = 23
//ProducerBlockedQuotaExceededError Result = 24 // Producer is
blocked
//ProducerBlockedQuotaExceededException Result = 25 // Producer is
getting exception
@@ -63,7 +67,8 @@ const (
TopicNotFound Result = 28 // Topic not found
SubscriptionNotFound Result = 29 // Subscription not found
//ConsumerNotFound Result = 30 // Consumer not
found
- //UnsupportedVersionError Result = 31 // Error when an
older client/version doesn't support a required feature
+ // UnsupportedVersionError when an older client/version doesn't support
a required feature
+ //UnsupportedVersionError Result = 31
//TopicTerminated Result = 32 // Topic was
already terminated
//CryptoError Result = 33 // Error when
crypto operation fails
)
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index cbd5c5d..ed2505a 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -114,12 +114,6 @@ func timeFromUnixTimestampMillis(timestamp uint64)
time.Time {
return time.Unix(seconds, nanos)
}
-func timeToUnixTimestampMillis(t time.Time) uint64 {
- nanos := t.UnixNano()
- millis := nanos / int64(time.Millisecond)
- return uint64(millis)
-}
-
type message struct {
publishTime time.Time
eventTime time.Time
diff --git a/pulsar/impl_message_test.go b/pulsar/impl_message_test.go
index e4ba3c1..54c3bee 100644
--- a/pulsar/impl_message_test.go
+++ b/pulsar/impl_message_test.go
@@ -75,8 +75,8 @@ func TestAckTracker(t *testing.T) {
func TestAckingMessageIDBatchOne(t *testing.T) {
tracker := newAckTracker(1)
- msgId := newTrackingMessageID(1, 1, 0, 0, tracker)
- assert.Equal(t, true, msgId.ack())
+ msgID := newTrackingMessageID(1, 1, 0, 0, tracker)
+ assert.Equal(t, true, msgID.ack())
assert.Equal(t, true, tracker.completed())
}
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 4b19438..33a1655 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -104,7 +104,7 @@ func (bb *BatchBuilder) hasSpace(payload []byte) bool {
}
// Add will add single message to batch.
-func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceID
uint64, payload []byte,
+func (bb *BatchBuilder) Add(metadata proto.Message, sequenceID uint64, payload
[]byte,
callback interface{}, replicateTo []string) bool {
if replicateTo != nil && bb.numMessages != 0 {
// If the current batch is not empty and we're trying to set
the replication clusters,
diff --git a/pulsar/internal/blocking_queue_test.go
b/pulsar/internal/blocking_queue_test.go
index bf21da5..12bb8fc 100644
--- a/pulsar/internal/blocking_queue_test.go
+++ b/pulsar/internal/blocking_queue_test.go
@@ -55,7 +55,7 @@ func TestBlockingQueue(t *testing.T) {
time.Sleep(100 * time.Millisecond)
select {
- case _ = <-ch:
+ case <-ch:
assert.Fail(t, "Should not have had a value at this point")
default:
// Good, no value yet
@@ -100,7 +100,7 @@ func TestBlockingQueueWaitWhenFull(t *testing.T) {
time.Sleep(100 * time.Millisecond)
select {
- case _ = <-ch:
+ case <-ch:
assert.Fail(t, "Should not have had a value at this point")
default:
// Good, no value yet
@@ -109,7 +109,7 @@ func TestBlockingQueueWaitWhenFull(t *testing.T) {
assert.Equal(t, "test-1", q.Poll())
// Now the go-routine should have completed
- _ = <-ch
+ <-ch
assert.Equal(t, 3, q.Size())
assert.Equal(t, "test-2", q.Take())
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 874d4cd..59a3981 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -60,7 +60,8 @@ func NewMessageReaderFromArray(headersAndPayload []byte)
*MessageReader {
// [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
//
// Batch format
-// [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA]
[METADATA_SIZE][METADATA][PAYLOAD] [METADATA_SIZE][METADATA][PAYLOAD]
+// [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA]
[METADATA_SIZE][METADATA][PAYLOAD]
+// [METADATA_SIZE][METADATA][PAYLOAD]
//
type MessageReader struct {
buffer Buffer
@@ -191,7 +192,7 @@ func baseCommand(cmdType pb.BaseCommand_Type, msg
proto.Message) *pb.BaseCommand
return cmd
}
-func addSingleMessageToBatch(wb Buffer, smm *pb.SingleMessageMetadata, payload
[]byte) {
+func addSingleMessageToBatch(wb Buffer, smm proto.Message, payload []byte) {
serialized, err := proto.Marshal(smm)
if err != nil {
log.WithError(err).Fatal("Protobuf serialization error")
@@ -202,7 +203,7 @@ func addSingleMessageToBatch(wb Buffer, smm
*pb.SingleMessageMetadata, payload [
wb.Write(payload)
}
-func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata
*pb.MessageMetadata, payload []byte) {
+func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata
proto.Message, payload []byte) {
// Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM]
[METADATA_SIZE][METADATA] [PAYLOAD]
cmdSize := proto.Size(cmdSend)
diff --git a/pulsar/internal/compression/zstd.go
b/pulsar/internal/compression/zstd.go
index b385c34..41fba0b 100644
--- a/pulsar/internal/compression/zstd.go
+++ b/pulsar/internal/compression/zstd.go
@@ -19,6 +19,7 @@ package compression
import (
"bytes"
+
"github.com/klauspost/compress/zstd"
"github.com/pkg/errors"
)
@@ -41,7 +42,7 @@ func (p *zstdProvider) Compress(data []byte) []byte {
return p.encoder.EncodeAll(data, []byte{})
}
-func (p* zstdProvider) Decompress(compressedData []byte, originalSize int)
([]byte, error) {
+func (p *zstdProvider) Decompress(compressedData []byte, originalSize int)
([]byte, error) {
d, err := zstd.NewReader(bytes.NewReader(compressedData))
if err != nil {
return nil, err
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index a42be14..7a7cd5f 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -299,7 +299,7 @@ func (c *connection) run() {
for {
select {
- case <- c.closeCh:
+ case <-c.closeCh:
c.Close()
return
@@ -318,7 +318,7 @@ func (c *connection) run() {
}
c.internalWriteData(data)
- case _ = <-c.pingTicker.C:
+ case <-c.pingTicker.C:
c.sendPing()
}
}
@@ -327,9 +327,9 @@ func (c *connection) run() {
func (c *connection) runPingCheck() {
for {
select {
- case <- c.closeCh:
+ case <-c.closeCh:
return
- case _ = <-c.pingCheckTicker.C:
+ case <-c.pingCheckTicker.C:
if c.lastDataReceived().Add(2 *
keepAliveInterval).Before(time.Now()) {
// We have not received a response to the
previous Ping request, the
// connection to broker is stale
@@ -442,7 +442,8 @@ func (c *connection) Write(data []byte) {
c.writeRequestsCh <- data
}
-func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
callback func(command *pb.BaseCommand, err error)) {
+func (c *connection) SendRequest(requestID uint64, req *pb.BaseCommand,
+ callback func(command *pb.BaseCommand, err error)) {
c.incomingRequestsCh <- &request{
id: &requestID,
cmd: req,
@@ -487,8 +488,8 @@ func (c *connection) handleResponseError(serverError
*pb.CommandError) {
delete(c.pendingReqs, requestID)
- request.callback(nil,
- errors.New(fmt.Sprintf("server error: %s: %s",
serverError.GetError(), serverError.GetMessage())))
+ errMsg := fmt.Sprintf("server error: %s: %s", serverError.GetError(),
serverError.GetMessage())
+ request.callback(nil, errors.New(errMsg))
}
func (c *connection) handleSendReceipt(response *pb.CommandSendReceipt) {
@@ -583,8 +584,8 @@ func (c *connection) TriggerClose() {
}
select {
- case <- c.closeCh:
- return
+ case <-c.closeCh:
+ return
default:
close(c.closeCh)
}
@@ -693,7 +694,6 @@ func (c *connection) consumerHandler(id uint64)
(ConsumerHandler, bool) {
return h, ok
}
-func (c *connection) ID() (string) {
+func (c *connection) ID() string {
return fmt.Sprintf("%s -> %s", c.cnx.LocalAddr(), c.cnx.RemoteAddr())
}
-
diff --git a/pulsar/internal/default_router.go
b/pulsar/internal/default_router.go
index 6626351..e4d31ca 100644
--- a/pulsar/internal/default_router.go
+++ b/pulsar/internal/default_router.go
@@ -40,7 +40,8 @@ func NewSystemClock() Clock {
// NewDefaultRouter set the message routing mode for the partitioned producer.
// Default routing mode is round-robin routing.
-func NewDefaultRouter(clock Clock, hashFunc func(string) uint32,
maxBatchingDelay time.Duration) func(string, uint32) int {
+func NewDefaultRouter(clock Clock, hashFunc func(string) uint32,
+ maxBatchingDelay time.Duration) func(string, uint32) int {
state := &defaultRouter{
clock: clock,
shiftIdx: rand.Uint32(),
diff --git a/pulsar/internal/lookup_service.go
b/pulsar/internal/lookup_service.go
index 0b54c07..ee6dffe 100644
--- a/pulsar/internal/lookup_service.go
+++ b/pulsar/internal/lookup_service.go
@@ -54,7 +54,8 @@ func NewLookupService(rpcClient RPCClient, serviceURL
*url.URL) LookupService {
}
}
-func (ls *lookupService) getBrokerAddress(lr *pb.CommandLookupTopicResponse)
(logicalAddress *url.URL, physicalAddress *url.URL, err error) {
+func (ls *lookupService) getBrokerAddress(lr *pb.CommandLookupTopicResponse)
(logicalAddress *url.URL,
+ physicalAddress *url.URL, err error) {
logicalAddress, err = url.ParseRequestURI(lr.GetBrokerServiceUrl())
if err != nil {
return nil, nil, err
diff --git a/pulsar/internal/lookup_service_test.go
b/pulsar/internal/lookup_service_test.go
index 9a9f053..5bb1724 100644
--- a/pulsar/internal/lookup_service_test.go
+++ b/pulsar/internal/lookup_service_test.go
@@ -49,7 +49,8 @@ func (c *mockedRPCClient) NewConsumerID() uint64 {
return 1
}
-func (c *mockedRPCClient) RequestToAnyBroker(requestID uint64, cmdType
pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
+func (c *mockedRPCClient) RequestToAnyBroker(requestID uint64, cmdType
pb.BaseCommand_Type,
+ message proto.Message) (*RPCResult, error) {
assert.Equal(c.t, cmdType, pb.BaseCommand_LOOKUP)
expectedRequest := &c.expectedRequests[0]
@@ -90,7 +91,8 @@ func (c *mockedRPCClient) Request(logicalAddr *url.URL,
physicalAddr *url.URL, r
}, nil
}
-func (c *mockedRPCClient) RequestOnCnx(cnx Connection, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
+func (c *mockedRPCClient) RequestOnCnx(cnx Connection, requestID uint64,
cmdType pb.BaseCommand_Type,
+ message proto.Message) (*RPCResult, error) {
assert.Fail(c.t, "Shouldn't be called")
return nil, nil
}
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index aa89b59..23c3b61 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -69,7 +69,8 @@ func NewRPCClient(serviceURL *url.URL, pool ConnectionPool,
requestTimeout time.
}
}
-func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType
pb.BaseCommand_Type, message proto.Message) (*RPCResult, error) {
+func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType
pb.BaseCommand_Type,
+ message proto.Message) (*RPCResult, error) {
return c.Request(c.serviceURL, c.serviceURL, requestID, cmdType,
message)
}
@@ -81,7 +82,10 @@ func (c *rpcClient) Request(logicalAddr *url.URL,
physicalAddr *url.URL, request
return nil, err
}
- type Res struct {*RPCResult; error}
+ type Res struct {
+ *RPCResult
+ error
+ }
ch := make(chan Res)
// TODO: Handle errors with disconnections
@@ -110,7 +114,7 @@ func (c *rpcClient) RequestOnCnx(cnx Connection, requestID
uint64, cmdType pb.Ba
Cnx: cnx,
}
- var rpcErr error = nil
+ var rpcErr error
cnx.SendRequest(requestID, baseCommand(cmdType, message), func(response
*pb.BaseCommand, err error) {
rpcResult.Response = response
rpcErr = err
diff --git a/pulsar/message.go b/pulsar/message.go
index dd4fcff..cbb3d63 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -59,7 +59,8 @@ type Message interface {
// The message id can be used to univocally refer to a message without
having the keep the entire payload in memory.
ID() MessageID
- // PublishTime get the publish time of this message. The publish time
is the timestamp that a client publish the message.
+ // PublishTime get the publish time of this message. The publish time
is the timestamp that a client
+ // publish the message.
PublishTime() time.Time
// EventTime get the event time associated with this message. It is
typically set by the applications via
diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go
index e8a79e8..f887844 100644
--- a/pulsar/negative_acks_tracker.go
+++ b/pulsar/negative_acks_tracker.go
@@ -18,9 +18,10 @@
package pulsar
import (
- log "github.com/sirupsen/logrus"
"sync"
"time"
+
+ log "github.com/sirupsen/logrus"
)
type redeliveryConsumer interface {
@@ -53,7 +54,7 @@ func newNegativeAcksTracker(rc redeliveryConsumer, delay
time.Duration) *negativ
func (t *negativeAcksTracker) Add(msgID *messageID) {
// Always clear up the batch index since we want to track the nack
// for the entire batch
- batchMsgId := messageID{
+ batchMsgID := messageID{
ledgerID: msgID.ledgerID,
entryID: msgID.entryID,
batchIdx: 0,
@@ -62,14 +63,14 @@ func (t *negativeAcksTracker) Add(msgID *messageID) {
t.Lock()
defer t.Unlock()
- _, present := t.negativeAcks[batchMsgId]
+ _, present := t.negativeAcks[batchMsgID]
if present {
// The batch is already being tracked
return
- } else {
- targetTime := time.Now().Add(t.delay)
- t.negativeAcks[batchMsgId] = targetTime
}
+
+ targetTime := time.Now().Add(t.delay)
+ t.negativeAcks[batchMsgID] = targetTime
}
func (t *negativeAcksTracker) track() {
diff --git a/pulsar/negative_acks_tracker_test.go
b/pulsar/negative_acks_tracker_test.go
index 93c1af2..f7a1c50 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -32,7 +32,6 @@ type nackMockedConsumer struct {
ch chan messageID
closed bool
lock sync.Mutex
- msgIds []messageID
}
func newNackMockedConsumer() *nackMockedConsumer {
diff --git a/pulsar/producer.go b/pulsar/producer.go
index 6a68e7b..46103b5 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -63,7 +63,8 @@ type ProducerOptions struct {
// This properties will be visible in the topic stats
Properties map[string]string
- // MaxPendingMessages set the max size of the queue holding the
messages pending to receive an acknowledgment from the broker.
+ // MaxPendingMessages set the max size of the queue holding the
messages pending to receive an
+ // acknowledgment from the broker.
MaxPendingMessages int
// HashingScheme change the `HashingScheme` used to chose the partition
on where to publish a particular message.
@@ -101,12 +102,14 @@ type ProducerOptions struct {
// Setting `DisableBatching: true` will make the producer to send
messages individually
DisableBatching bool
- // BatchingMaxPublishDelay set the time period within which the
messages sent will be batched (default: 10ms) if batch messages are
- // enabled. If set to a non zero value, messages will be queued until
this time interval or until
+ // BatchingMaxPublishDelay set the time period within which the
messages sent will be batched (default: 10ms)
+ // if batch messages are enabled. If set to a non zero value, messages
will be queued until this time
+ // interval or until
BatchingMaxPublishDelay time.Duration
- // BatchingMaxMessages set the maximum number of messages permitted in
a batch. (default: 1000) If set to a value greater than 1,
- // messages will be queued until this threshold is reached or batch
interval has elapsed
+ // BatchingMaxMessages set the maximum number of messages permitted in
a batch. (default: 1000)
+ // If set to a value greater than 1, messages will be queued until this
threshold is reached or
+ // batch interval has elapsed.
BatchingMaxMessages uint
}
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index ce97c22..101929d 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -129,7 +129,8 @@ func (p *producer) Send(ctx context.Context, msg
*ProducerMessage) (MessageID, e
return p.producers[partition].Send(ctx, msg)
}
-func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error)) {
+func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage,
+ callback func(MessageID, *ProducerMessage, error)) {
partition := p.messageRouter(msg, p)
p.producers[partition].SendAsync(ctx, msg, callback)
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 47dff85..f96a706 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -66,8 +66,8 @@ type partitionProducer struct {
const defaultBatchingMaxPublishDelay = 10 * time.Millisecond
-func newPartitionProducer(client *client, topic string, options
*ProducerOptions, partitionIdx int) (*partitionProducer, error) {
-
+func newPartitionProducer(client *client, topic string, options
*ProducerOptions, partitionIdx int) (
+ *partitionProducer, error) {
var batchingMaxPublishDelay time.Duration
if options.BatchingMaxPublishDelay != 0 {
batchingMaxPublishDelay = options.BatchingMaxPublishDelay
@@ -126,11 +126,11 @@ func (p *partitionProducer) grabCnx() error {
p.log.Debug("Lookup result: ", lr)
id := p.client.rpcClient.NewRequestID()
cmdProducer := &pb.CommandProducer{
- RequestId: proto.Uint64(id),
- Topic: proto.String(p.topic),
- Encrypted: nil,
- ProducerId: proto.Uint64(p.producerID),
- Schema: nil,
+ RequestId: proto.Uint64(id),
+ Topic: proto.String(p.topic),
+ Encrypted: nil,
+ ProducerId: proto.Uint64(p.producerID),
+ Schema: nil,
}
if p.producerName != "" {
@@ -218,7 +218,7 @@ func (p *partitionProducer) runEventsLoop() {
return
}
- case _ = <-p.batchFlushTicker.C:
+ case <-p.batchFlushTicker.C:
p.internalFlushCurrentBatch()
}
}
@@ -341,11 +341,11 @@ func (p *partitionProducer) Send(ctx context.Context, msg
*ProducerMessage) (Mes
wg.Add(1)
var err error
- var msgId MessageID
+ var msgID MessageID
p.internalSendAsync(ctx, msg, func(ID MessageID, message
*ProducerMessage, e error) {
err = e
- msgId = ID
+ msgID = ID
wg.Done()
}, true)
@@ -356,7 +356,7 @@ func (p *partitionProducer) Send(ctx context.Context, msg
*ProducerMessage) (Mes
}
wg.Wait()
- return msgId, err
+ return msgID, err
}
func (p *partitionProducer) SendAsync(ctx context.Context, msg
*ProducerMessage,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 189902c..a387d3d 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -20,12 +20,12 @@ package pulsar
import (
"context"
"fmt"
- "github.com/apache/pulsar-client-go/pulsar/internal"
"net/http"
"sync"
"testing"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/stretchr/testify/assert"
log "github.com/sirupsen/logrus"
@@ -384,6 +384,7 @@ func TestFlushInPartitionedProducer(t *testing.T) {
BatchingMaxMessages: uint(numOfMessages /
numberOfPartitions),
BatchingMaxPublishDelay: time.Second * 10,
})
+ assert.Nil(t, err)
defer producer.Close()
// send 5 messages
diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go
index d8cdc6d..3471bca 100644
--- a/pulsar/test_helper.go
+++ b/pulsar/test_helper.go
@@ -21,7 +21,6 @@ import (
"bytes"
"encoding/json"
"fmt"
- "github.com/apache/pulsar-client-go/pulsar/internal"
"io"
"io/ioutil"
"net/http"
@@ -30,6 +29,8 @@ import (
"testing"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/internal"
+
pkgerrors "github.com/pkg/errors"
)
@@ -141,10 +142,6 @@ func createNamespace(namespace string, policy
map[string]interface{}) error {
return httpPut("admin/v2/namespaces/"+namespace, policy)
}
-func deleteNamespace(namespace string) error {
- return httpDelete("admin/v2/namespaces/" + namespace)
-}
-
func createTopic(topic string) error {
return httpPut("admin/v2/persistent/"+topic, nil)
}
diff --git a/run-ci.sh b/run-ci.sh
index 23896f4..2de0eee 100755
--- a/run-ci.sh
+++ b/run-ci.sh
@@ -31,7 +31,7 @@ go build -o pulsar-perf ./perf
./pulsar-test-service-start.sh
-go test -race -coverprofile=/tmp/coverage -timeout=1h ./...
+go test -race -coverprofile=/tmp/coverage -timeout=20m ./...
go tool cover -html=/tmp/coverage -o coverage.html
./pulsar-test-service-stop.sh