This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new f491e09e [Improve][Producer] normalize and export the errors (#1143)
f491e09e is described below
commit f491e09e025ad72806f2f49b4a195b3c5b27d55a
Author: gunli <[email protected]>
AuthorDate: Fri Dec 29 10:57:27 2023 +0800
[Improve][Producer] normalize and export the errors (#1143)
* [Improve][Producer] normalize and export the errors
* update schema error
* update go version to 1.20 to support errors.Join()
* use errors.Is() to test an error
* use github.com/hashicorp/go-multierror to join errors instead of
errors.Join() of Go 1.20
* revert go version to 1.18
* revert go version to 1.18
* update ErrSchema according to the CR sugguestions
* update ErrTransaction to a normal error
* rename ErrProducerBlocked to ErrProducerBlockedQuotaExceeded
* add license header
* fix unit test error
---------
Co-authored-by: gunli <[email protected]>
---
go.mod | 2 ++
go.sum | 4 +++
pulsar/consumer_test.go | 2 +-
pulsar/error.go | 8 +++++
pulsar/error_test.go | 36 +++++++++++++++++++
pulsar/producer_partition.go | 84 ++++++++++++++++++++++++--------------------
pulsar/producer_test.go | 15 ++++----
7 files changed, 104 insertions(+), 47 deletions(-)
diff --git a/go.mod b/go.mod
index f88d6ad5..52bfe43a 100644
--- a/go.mod
+++ b/go.mod
@@ -43,6 +43,8 @@ require (
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c //
indirect
+ github.com/hashicorp/errwrap v1.0.0 // indirect
+ github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
diff --git a/go.sum b/go.sum
index 00a44917..e8a9e76b 100644
--- a/go.sum
+++ b/go.sum
@@ -160,6 +160,10 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod
h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/gorilla/mux v1.7.4/go.mod
h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c
h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod
h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
+github.com/hashicorp/errwrap v1.0.0
h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
+github.com/hashicorp/errwrap v1.0.0/go.mod
h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
+github.com/hashicorp/go-multierror v1.1.1
h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
+github.com/hashicorp/go-multierror v1.1.1/go.mod
h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/golang-lru v0.5.0/go.mod
h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod
h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hpcloud/tail v1.0.0/go.mod
h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 33382cff..8b983d0d 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -4299,7 +4299,7 @@ func TestConsumerMemoryLimit(t *testing.T) {
Payload: createTestMessagePayload(1),
})
// Producer can't send message
- assert.Equal(t, true, errors.Is(err, errMemoryBufferIsFull))
+ assert.Equal(t, true, errors.Is(err, ErrMemoryBufferIsFull))
}
func TestMultiConsumerMemoryLimit(t *testing.T) {
diff --git a/pulsar/error.go b/pulsar/error.go
index 25498cfb..f0379934 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -21,6 +21,7 @@ import (
"fmt"
proto "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+ "github.com/hashicorp/go-multierror"
)
// Result used to represent pulsar processing is an alias of type int.
@@ -245,3 +246,10 @@ func getErrorFromServerError(serverError
*proto.ServerError) error {
return newError(UnknownError, serverError.String())
}
}
+
+// joinErrors can join multiple errors into one error, and the returned error
can be tested by errors.Is()
+// we use github.com/hashicorp/go-multierror instead of errors.Join() of Go
1.20 so that we can compile pulsar
+// go client with go versions that newer than go 1.13
+func joinErrors(errs ...error) error {
+ return multierror.Append(nil, errs...)
+}
diff --git a/pulsar/error_test.go b/pulsar/error_test.go
new file mode 100644
index 00000000..5403effc
--- /dev/null
+++ b/pulsar/error_test.go
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+ "errors"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_joinErrors(t *testing.T) {
+ err1 := errors.New("err1")
+ err2 := errors.New("err2")
+ err3 := errors.New("err3")
+ err := joinErrors(ErrInvalidMessage, err1, err2)
+ assert.True(t, errors.Is(err, ErrInvalidMessage))
+ assert.True(t, errors.Is(err, err1))
+ assert.True(t, errors.Is(err, err2))
+ assert.False(t, errors.Is(err, err3))
+}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 46167d0c..1b79053e 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -51,14 +51,21 @@ const (
)
var (
- errFailAddToBatch = newError(AddToBatchFailed, "message add to
batch failed")
- errSendTimeout = newError(TimeoutError, "message send timeout")
- errSendQueueIsFull = newError(ProducerQueueIsFull, "producer send
queue is full")
- errContextExpired = newError(TimeoutError, "message send context
expired")
- errMessageTooLarge = newError(MessageTooBig, "message size exceeds
MaxMessageSize")
- errMetaTooLarge = newError(InvalidMessage, "message metadata size
exceeds MaxMessageSize")
- errProducerClosed = newError(ProducerClosed, "producer already been
closed")
- errMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client
memory buffer is full")
+ ErrFailAddToBatch = newError(AddToBatchFailed, "message
add to batch failed")
+ ErrSendTimeout = newError(TimeoutError, "message send
timeout")
+ ErrSendQueueIsFull = newError(ProducerQueueIsFull,
"producer send queue is full")
+ ErrContextExpired = newError(TimeoutError, "message send
context expired")
+ ErrMessageTooLarge = newError(MessageTooBig, "message size
exceeds MaxMessageSize")
+ ErrMetaTooLarge = newError(InvalidMessage, "message
metadata size exceeds MaxMessageSize")
+ ErrProducerClosed = newError(ProducerClosed, "producer
already been closed")
+ ErrMemoryBufferIsFull = newError(ClientMemoryBufferIsFull,
"client memory buffer is full")
+ ErrSchema = newError(SchemaFailure, "schema
error")
+ ErrTransaction = errors.New("transaction error")
+ ErrInvalidMessage = newError(InvalidMessage, "invalid
message")
+ ErrTopicNotfound = newError(TopicNotFound, "topic not
found")
+ ErrTopicTerminated = newError(TopicTerminated, "topic
terminated")
+ ErrProducerBlockedQuotaExceeded =
newError(ProducerBlockedQuotaExceededException, "producer blocked")
+ ErrProducerFenced = newError(ProducerFenced, "producer
fenced")
buffersPool sync.Pool
sendRequestPool *sync.Pool
@@ -449,25 +456,25 @@ func (p *partitionProducer) reconnectToBroker() {
if strings.Contains(errMsg, errMsgTopicNotFound) {
// when topic is deleted, we should give up
reconnection.
p.log.Warn("Topic not found, stop reconnecting, close
the producer")
- p.doClose(newError(TopicNotFound, err.Error()))
+ p.doClose(joinErrors(ErrTopicNotfound, err))
break
}
if strings.Contains(errMsg, errMsgTopicTerminated) {
p.log.Warn("Topic was terminated, failing pending
messages, stop reconnecting, close the producer")
- p.doClose(newError(TopicTerminated, err.Error()))
+ p.doClose(joinErrors(ErrTopicTerminated, err))
break
}
if strings.Contains(errMsg,
errMsgProducerBlockedQuotaExceededException) {
p.log.Warn("Producer was blocked by quota exceed
exception, failing pending messages, stop reconnecting")
-
p.failPendingMessages(newError(ProducerBlockedQuotaExceededException,
err.Error()))
+
p.failPendingMessages(joinErrors(ErrProducerBlockedQuotaExceeded, err))
break
}
if strings.Contains(errMsg, errMsgProducerFenced) {
p.log.Warn("Producer was fenced, failing pending
messages, stop reconnecting")
- p.doClose(newError(ProducerFenced, err.Error()))
+ p.doClose(joinErrors(ErrProducerFenced, err))
break
}
@@ -547,7 +554,7 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
p.log.WithField("size", sr.uncompressedSize).
WithField("properties",
sr.msg.Properties).
Error("unable to add message to batch")
- sr.done(nil, errFailAddToBatch)
+ sr.done(nil, ErrFailAddToBatch)
return
}
}
@@ -802,7 +809,7 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
}
if errors.Is(err, internal.ErrExceedMaxMessageSize) {
- p.log.WithError(errMessageTooLarge).Errorf("internal
err: %s", err)
+ p.log.WithError(ErrMessageTooLarge).Errorf("internal
err: %s", err)
}
return
@@ -893,11 +900,11 @@ func (p *partitionProducer) failTimeoutMessages() {
for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
- sr.done(nil, errSendTimeout)
+ sr.done(nil, ErrSendTimeout)
}
// flag the sending has completed with error, flush
make no effect
- pi.done(errSendTimeout)
+ pi.done(ErrSendTimeout)
pi.Unlock()
// finally reached the last view item, current
iteration ends
@@ -926,7 +933,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
}
if errors.Is(errs[i], internal.ErrExceedMaxMessageSize)
{
-
p.log.WithError(errMessageTooLarge).Errorf("internal err: %s", errs[i])
+
p.log.WithError(ErrMessageTooLarge).Errorf("internal err: %s", errs[i])
return
}
@@ -1019,18 +1026,18 @@ func (p *partitionProducer) SendAsync(ctx
context.Context, msg *ProducerMessage,
func (p *partitionProducer) validateMsg(msg *ProducerMessage) error {
if msg == nil {
- return newError(InvalidMessage, "Message is nil")
+ return joinErrors(ErrInvalidMessage, fmt.Errorf("message is
nil"))
}
if msg.Value != nil && msg.Payload != nil {
- return newError(InvalidMessage, "Can not set Value and Payload
both")
+ return joinErrors(ErrInvalidMessage, fmt.Errorf("can not set
Value and Payload both"))
}
if p.options.DisableMultiSchema {
if msg.Schema != nil && p.options.Schema != nil &&
msg.Schema.GetSchemaInfo().hash() !=
p.options.Schema.GetSchemaInfo().hash() {
p.log.Errorf("The producer %s of the topic %s is
disabled the `MultiSchema`", p.producerName, p.topic)
- return fmt.Errorf("msg schema can not match with
producer schema")
+ return joinErrors(ErrSchema, fmt.Errorf("msg schema can
not match with producer schema"))
}
}
@@ -1046,15 +1053,16 @@ func (p *partitionProducer) prepareTransaction(sr
*sendRequest) error {
if txn.state != TxnOpen {
p.log.WithField("state", txn.state).Error("Failed to send
message" +
" by a non-open transaction.")
- return newError(InvalidStatus, "Failed to send message by a
non-open transaction.")
+ return joinErrors(ErrTransaction,
+ fmt.Errorf("failed to send message by a non-open
transaction"))
}
if err := txn.registerProducerTopic(p.topic); err != nil {
- return err
+ return joinErrors(ErrTransaction, err)
}
if err := txn.registerSendOrAckOp(); err != nil {
- return err
+ return joinErrors(ErrTransaction, err)
}
sr.transaction = txn
@@ -1080,7 +1088,7 @@ func (p *partitionProducer) updateSchema(sr *sendRequest)
error {
if schemaVersion == nil {
schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
if err != nil {
- return fmt.Errorf("get schema version fail, err: %w",
err)
+ return joinErrors(ErrSchema, fmt.Errorf("get schema
version fail, err: %w", err))
}
p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
}
@@ -1097,7 +1105,7 @@ func (p *partitionProducer) updateUncompressedPayload(sr
*sendRequest) error {
if sr.msg.Value != nil {
if sr.schema == nil {
p.log.Errorf("Schema encode message failed %s",
sr.msg.Value)
- return newError(SchemaFailure, "set schema value
without setting schema")
+ return joinErrors(ErrSchema, fmt.Errorf("set schema
value without setting schema"))
}
// payload and schema are mutually exclusive
@@ -1105,7 +1113,7 @@ func (p *partitionProducer) updateUncompressedPayload(sr
*sendRequest) error {
schemaPayload, err := sr.schema.Encode(sr.msg.Value)
if err != nil {
p.log.WithError(err).Errorf("Schema encode message
failed %s", sr.msg.Value)
- return newError(SchemaFailure, err.Error())
+ return joinErrors(ErrSchema, err)
}
sr.uncompressedPayload = schemaPayload
@@ -1160,11 +1168,11 @@ func (p *partitionProducer) updateChunkInfo(sr
*sendRequest) error {
// if msg is too large and chunking is disabled
if checkSize > int64(sr.maxMessageSize) && !p.options.EnableChunking {
- p.log.WithError(errMessageTooLarge).
+ p.log.WithError(ErrMessageTooLarge).
WithField("size", checkSize).
WithField("properties", sr.msg.Properties).
Errorf("MaxMessageSize %d", sr.maxMessageSize)
- return errMessageTooLarge
+ return ErrMessageTooLarge
}
if sr.sendAsBatch || !p.options.EnableChunking {
@@ -1173,11 +1181,11 @@ func (p *partitionProducer) updateChunkInfo(sr
*sendRequest) error {
} else {
sr.payloadChunkSize = int(sr.maxMessageSize) - proto.Size(sr.mm)
if sr.payloadChunkSize <= 0 {
- p.log.WithError(errMetaTooLarge).
+ p.log.WithError(ErrMetaTooLarge).
WithField("metadata size", proto.Size(sr.mm)).
WithField("properties", sr.msg.Properties).
Errorf("MaxMessageSize %d",
int(p._getConn().GetMaxMessageSize()))
- return errMetaTooLarge
+ return ErrMetaTooLarge
}
// set ChunkMaxMessageSize
if p.options.ChunkMaxMessageSize != 0 {
@@ -1220,7 +1228,7 @@ func (p *partitionProducer) internalSendAsync(
}
if p.getProducerState() != producerReady {
- sr.done(nil, errProducerClosed)
+ sr.done(nil, ErrProducerClosed)
return
}
@@ -1333,7 +1341,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response
*pb.CommandSendReceipt)
func (p *partitionProducer) internalClose(req *closeProducer) {
defer close(req.doneCh)
- p.doClose(errProducerClosed)
+ p.doClose(ErrProducerClosed)
}
func (p *partitionProducer) doClose(reason error) {
@@ -1508,11 +1516,11 @@ func (sr *sendRequest) done(msgID MessageID, err error)
{
WithField("properties", sr.msg.Properties)
}
- if errors.Is(err, errSendTimeout) {
+ if errors.Is(err, ErrSendTimeout) {
sr.producer.metrics.PublishErrorsTimeout.Inc()
}
- if errors.Is(err, errMessageTooLarge) {
+ if errors.Is(err, ErrMessageTooLarge) {
sr.producer.metrics.PublishErrorsMsgTooLarge.Inc()
}
@@ -1554,7 +1562,7 @@ func (p *partitionProducer) reserveSemaphore(sr
*sendRequest) error {
for i := 0; i < sr.totalChunks; i++ {
if p.blockIfQueueFull() {
if !p.publishSemaphore.Acquire(sr.ctx) {
- return errContextExpired
+ return ErrContextExpired
}
// update sr.semaphore and sr.reservedSemaphore here so
that we can release semaphore in the case
@@ -1564,7 +1572,7 @@ func (p *partitionProducer) reserveSemaphore(sr
*sendRequest) error {
p.metrics.MessagesPending.Inc()
} else {
if !p.publishSemaphore.TryAcquire() {
- return errSendQueueIsFull
+ return ErrSendQueueIsFull
}
// update sr.semaphore and sr.reservedSemaphore here so
that we can release semaphore in the case
@@ -1586,11 +1594,11 @@ func (p *partitionProducer) reserveMem(sr *sendRequest)
error {
if p.blockIfQueueFull() {
if !p.client.memLimit.ReserveMemory(sr.ctx, requiredMem) {
- return errContextExpired
+ return ErrContextExpired
}
} else {
if !p.client.memLimit.TryReserveMemory(requiredMem) {
- return errMemoryBufferIsFull
+ return ErrMemoryBufferIsFull
}
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 0f890692..0d74cdee 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1061,8 +1061,8 @@ func TestMaxMessageSize(t *testing.T) {
assert.NoError(t, err)
defer client.Close()
- // Need to set BatchingMaxSize > serverMaxMessageSize to avoid
errMessageTooLarge
- // being masked by an earlier errFailAddToBatch
+ // Need to set BatchingMaxSize > serverMaxMessageSize to avoid
ErrMessageTooLarge
+ // being masked by an earlier ErrFailAddToBatch
producer, err := client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
BatchingMaxSize: uint(2 * serverMaxMessageSize),
@@ -1088,7 +1088,7 @@ func TestMaxMessageSize(t *testing.T) {
// So when bias <= 0, the uncompressed payload will not exceed
maxMessageSize,
// but encryptedPayloadSize exceeds maxMessageSize, Send() will return
an internal error.
// When bias = 1, the first check of maxMessageSize (for uncompressed
payload) is valid,
- // Send() will return errMessageTooLarge
+ // Send() will return ErrMessageTooLarge
for bias := -1; bias <= 1; bias++ {
payload := make([]byte, serverMaxMessageSize+bias)
ID, err := producer.Send(context.Background(), &ProducerMessage{
@@ -1098,7 +1098,7 @@ func TestMaxMessageSize(t *testing.T) {
assert.Equal(t, true, errors.Is(err,
internal.ErrExceedMaxMessageSize))
assert.Nil(t, ID)
} else {
- assert.Equal(t, errMessageTooLarge, err)
+ assert.True(t, errors.Is(err, ErrMessageTooLarge))
}
}
@@ -1111,7 +1111,7 @@ func TestMaxMessageSize(t *testing.T) {
assert.Equal(t, true, errors.Is(err,
internal.ErrExceedMaxMessageSize))
assert.Nil(t, ID)
} else {
- assert.Equal(t, errMessageTooLarge, err)
+ assert.True(t, errors.Is(err, ErrMessageTooLarge))
}
}
}
@@ -1190,8 +1190,7 @@ func TestTopicTermination(t *testing.T) {
Payload: make([]byte, 1024),
})
if err != nil {
- e := err.(*Error)
- if e.result == TopicTerminated || err ==
errProducerClosed {
+ if errors.Is(err, ErrTopicTerminated) ||
errors.Is(err, ErrProducerClosed) {
terminatedChan <- true
} else {
terminatedChan <- false
@@ -2348,7 +2347,7 @@ func TestFailPendingMessageWithClose(t *testing.T) {
Payload: make([]byte, 1024),
}, func(id MessageID, message *ProducerMessage, e error) {
if e != nil {
- assert.Equal(t, errProducerClosed, e)
+ assert.True(t, errors.Is(e, ErrProducerClosed))
}
})
}