This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f491e09e [Improve][Producer] normalize and export the errors (#1143)
f491e09e is described below

commit f491e09e025ad72806f2f49b4a195b3c5b27d55a
Author: gunli <[email protected]>
AuthorDate: Fri Dec 29 10:57:27 2023 +0800

    [Improve][Producer] normalize and export the errors (#1143)
    
    * [Improve][Producer] normalize and export the errors
    
    * update schema error
    
    * update go version to 1.20 to support errors.Join()
    
    * use errors.Is() to test an error
    
    * use github.com/hashicorp/go-multierror to join errors instead of 
errors.Join() of Go 1.20
    
    * revert go version to 1.18
    
    * revert go version to 1.18
    
    * update ErrSchema according to the CR sugguestions
    
    * update ErrTransaction to a normal error
    
    * rename ErrProducerBlocked to ErrProducerBlockedQuotaExceeded
    
    * add license header
    
    * fix unit test error
    
    ---------
    
    Co-authored-by: gunli <[email protected]>
---
 go.mod                       |  2 ++
 go.sum                       |  4 +++
 pulsar/consumer_test.go      |  2 +-
 pulsar/error.go              |  8 +++++
 pulsar/error_test.go         | 36 +++++++++++++++++++
 pulsar/producer_partition.go | 84 ++++++++++++++++++++++++--------------------
 pulsar/producer_test.go      | 15 ++++----
 7 files changed, 104 insertions(+), 47 deletions(-)

diff --git a/go.mod b/go.mod
index f88d6ad5..52bfe43a 100644
--- a/go.mod
+++ b/go.mod
@@ -43,6 +43,8 @@ require (
        github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
        github.com/golang/snappy v0.0.1 // indirect
        github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // 
indirect
+       github.com/hashicorp/errwrap v1.0.0 // indirect
+       github.com/hashicorp/go-multierror v1.1.1 // indirect
        github.com/inconshreveable/mousetrap v1.0.1 // indirect
        github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
        github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
diff --git a/go.sum b/go.sum
index 00a44917..e8a9e76b 100644
--- a/go.sum
+++ b/go.sum
@@ -160,6 +160,10 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod 
h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
 github.com/gorilla/mux v1.7.4/go.mod 
h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
 github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c 
h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
 github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod 
h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
+github.com/hashicorp/errwrap v1.0.0 
h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
+github.com/hashicorp/errwrap v1.0.0/go.mod 
h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
+github.com/hashicorp/go-multierror v1.1.1 
h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
+github.com/hashicorp/go-multierror v1.1.1/go.mod 
h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
 github.com/hashicorp/golang-lru v0.5.0/go.mod 
h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
 github.com/hashicorp/golang-lru v0.5.1/go.mod 
h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
 github.com/hpcloud/tail v1.0.0/go.mod 
h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 33382cff..8b983d0d 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -4299,7 +4299,7 @@ func TestConsumerMemoryLimit(t *testing.T) {
                Payload: createTestMessagePayload(1),
        })
        // Producer can't send message
-       assert.Equal(t, true, errors.Is(err, errMemoryBufferIsFull))
+       assert.Equal(t, true, errors.Is(err, ErrMemoryBufferIsFull))
 }
 
 func TestMultiConsumerMemoryLimit(t *testing.T) {
diff --git a/pulsar/error.go b/pulsar/error.go
index 25498cfb..f0379934 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -21,6 +21,7 @@ import (
        "fmt"
 
        proto "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+       "github.com/hashicorp/go-multierror"
 )
 
 // Result used to represent pulsar processing is an alias of type int.
@@ -245,3 +246,10 @@ func getErrorFromServerError(serverError 
*proto.ServerError) error {
                return newError(UnknownError, serverError.String())
        }
 }
+
+// joinErrors can join multiple errors into one error, and the returned error 
can be tested by errors.Is()
+// we use github.com/hashicorp/go-multierror instead of errors.Join() of Go 
1.20 so that we can compile pulsar
+// go client with go versions that newer than go 1.13
+func joinErrors(errs ...error) error {
+       return multierror.Append(nil, errs...)
+}
diff --git a/pulsar/error_test.go b/pulsar/error_test.go
new file mode 100644
index 00000000..5403effc
--- /dev/null
+++ b/pulsar/error_test.go
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "errors"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func Test_joinErrors(t *testing.T) {
+       err1 := errors.New("err1")
+       err2 := errors.New("err2")
+       err3 := errors.New("err3")
+       err := joinErrors(ErrInvalidMessage, err1, err2)
+       assert.True(t, errors.Is(err, ErrInvalidMessage))
+       assert.True(t, errors.Is(err, err1))
+       assert.True(t, errors.Is(err, err2))
+       assert.False(t, errors.Is(err, err3))
+}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 46167d0c..1b79053e 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -51,14 +51,21 @@ const (
 )
 
 var (
-       errFailAddToBatch     = newError(AddToBatchFailed, "message add to 
batch failed")
-       errSendTimeout        = newError(TimeoutError, "message send timeout")
-       errSendQueueIsFull    = newError(ProducerQueueIsFull, "producer send 
queue is full")
-       errContextExpired     = newError(TimeoutError, "message send context 
expired")
-       errMessageTooLarge    = newError(MessageTooBig, "message size exceeds 
MaxMessageSize")
-       errMetaTooLarge       = newError(InvalidMessage, "message metadata size 
exceeds MaxMessageSize")
-       errProducerClosed     = newError(ProducerClosed, "producer already been 
closed")
-       errMemoryBufferIsFull = newError(ClientMemoryBufferIsFull, "client 
memory buffer is full")
+       ErrFailAddToBatch               = newError(AddToBatchFailed, "message 
add to batch failed")
+       ErrSendTimeout                  = newError(TimeoutError, "message send 
timeout")
+       ErrSendQueueIsFull              = newError(ProducerQueueIsFull, 
"producer send queue is full")
+       ErrContextExpired               = newError(TimeoutError, "message send 
context expired")
+       ErrMessageTooLarge              = newError(MessageTooBig, "message size 
exceeds MaxMessageSize")
+       ErrMetaTooLarge                 = newError(InvalidMessage, "message 
metadata size exceeds MaxMessageSize")
+       ErrProducerClosed               = newError(ProducerClosed, "producer 
already been closed")
+       ErrMemoryBufferIsFull           = newError(ClientMemoryBufferIsFull, 
"client memory buffer is full")
+       ErrSchema                       = newError(SchemaFailure, "schema 
error")
+       ErrTransaction                  = errors.New("transaction error")
+       ErrInvalidMessage               = newError(InvalidMessage, "invalid 
message")
+       ErrTopicNotfound                = newError(TopicNotFound, "topic not 
found")
+       ErrTopicTerminated              = newError(TopicTerminated, "topic 
terminated")
+       ErrProducerBlockedQuotaExceeded = 
newError(ProducerBlockedQuotaExceededException, "producer blocked")
+       ErrProducerFenced               = newError(ProducerFenced, "producer 
fenced")
 
        buffersPool     sync.Pool
        sendRequestPool *sync.Pool
@@ -449,25 +456,25 @@ func (p *partitionProducer) reconnectToBroker() {
                if strings.Contains(errMsg, errMsgTopicNotFound) {
                        // when topic is deleted, we should give up 
reconnection.
                        p.log.Warn("Topic not found, stop reconnecting, close 
the producer")
-                       p.doClose(newError(TopicNotFound, err.Error()))
+                       p.doClose(joinErrors(ErrTopicNotfound, err))
                        break
                }
 
                if strings.Contains(errMsg, errMsgTopicTerminated) {
                        p.log.Warn("Topic was terminated, failing pending 
messages, stop reconnecting, close the producer")
-                       p.doClose(newError(TopicTerminated, err.Error()))
+                       p.doClose(joinErrors(ErrTopicTerminated, err))
                        break
                }
 
                if strings.Contains(errMsg, 
errMsgProducerBlockedQuotaExceededException) {
                        p.log.Warn("Producer was blocked by quota exceed 
exception, failing pending messages, stop reconnecting")
-                       
p.failPendingMessages(newError(ProducerBlockedQuotaExceededException, 
err.Error()))
+                       
p.failPendingMessages(joinErrors(ErrProducerBlockedQuotaExceeded, err))
                        break
                }
 
                if strings.Contains(errMsg, errMsgProducerFenced) {
                        p.log.Warn("Producer was fenced, failing pending 
messages, stop reconnecting")
-                       p.doClose(newError(ProducerFenced, err.Error()))
+                       p.doClose(joinErrors(ErrProducerFenced, err))
                        break
                }
 
@@ -547,7 +554,7 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
                                p.log.WithField("size", sr.uncompressedSize).
                                        WithField("properties", 
sr.msg.Properties).
                                        Error("unable to add message to batch")
-                               sr.done(nil, errFailAddToBatch)
+                               sr.done(nil, ErrFailAddToBatch)
                                return
                        }
                }
@@ -802,7 +809,7 @@ func (p *partitionProducer) internalFlushCurrentBatch() {
                }
 
                if errors.Is(err, internal.ErrExceedMaxMessageSize) {
-                       p.log.WithError(errMessageTooLarge).Errorf("internal 
err: %s", err)
+                       p.log.WithError(ErrMessageTooLarge).Errorf("internal 
err: %s", err)
                }
 
                return
@@ -893,11 +900,11 @@ func (p *partitionProducer) failTimeoutMessages() {
 
                        for _, i := range pi.sendRequests {
                                sr := i.(*sendRequest)
-                               sr.done(nil, errSendTimeout)
+                               sr.done(nil, ErrSendTimeout)
                        }
 
                        // flag the sending has completed with error, flush 
make no effect
-                       pi.done(errSendTimeout)
+                       pi.done(ErrSendTimeout)
                        pi.Unlock()
 
                        // finally reached the last view item, current 
iteration ends
@@ -926,7 +933,7 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
                        }
 
                        if errors.Is(errs[i], internal.ErrExceedMaxMessageSize) 
{
-                               
p.log.WithError(errMessageTooLarge).Errorf("internal err: %s", errs[i])
+                               
p.log.WithError(ErrMessageTooLarge).Errorf("internal err: %s", errs[i])
                                return
                        }
 
@@ -1019,18 +1026,18 @@ func (p *partitionProducer) SendAsync(ctx 
context.Context, msg *ProducerMessage,
 
 func (p *partitionProducer) validateMsg(msg *ProducerMessage) error {
        if msg == nil {
-               return newError(InvalidMessage, "Message is nil")
+               return joinErrors(ErrInvalidMessage, fmt.Errorf("message is 
nil"))
        }
 
        if msg.Value != nil && msg.Payload != nil {
-               return newError(InvalidMessage, "Can not set Value and Payload 
both")
+               return joinErrors(ErrInvalidMessage, fmt.Errorf("can not set 
Value and Payload both"))
        }
 
        if p.options.DisableMultiSchema {
                if msg.Schema != nil && p.options.Schema != nil &&
                        msg.Schema.GetSchemaInfo().hash() != 
p.options.Schema.GetSchemaInfo().hash() {
                        p.log.Errorf("The producer %s of the topic %s is 
disabled the `MultiSchema`", p.producerName, p.topic)
-                       return fmt.Errorf("msg schema can not match with 
producer schema")
+                       return joinErrors(ErrSchema, fmt.Errorf("msg schema can 
not match with producer schema"))
                }
        }
 
@@ -1046,15 +1053,16 @@ func (p *partitionProducer) prepareTransaction(sr 
*sendRequest) error {
        if txn.state != TxnOpen {
                p.log.WithField("state", txn.state).Error("Failed to send 
message" +
                        " by a non-open transaction.")
-               return newError(InvalidStatus, "Failed to send message by a 
non-open transaction.")
+               return joinErrors(ErrTransaction,
+                       fmt.Errorf("failed to send message by a non-open 
transaction"))
        }
 
        if err := txn.registerProducerTopic(p.topic); err != nil {
-               return err
+               return joinErrors(ErrTransaction, err)
        }
 
        if err := txn.registerSendOrAckOp(); err != nil {
-               return err
+               return joinErrors(ErrTransaction, err)
        }
 
        sr.transaction = txn
@@ -1080,7 +1088,7 @@ func (p *partitionProducer) updateSchema(sr *sendRequest) 
error {
        if schemaVersion == nil {
                schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
                if err != nil {
-                       return fmt.Errorf("get schema version fail, err: %w", 
err)
+                       return joinErrors(ErrSchema, fmt.Errorf("get schema 
version fail, err: %w", err))
                }
                p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
        }
@@ -1097,7 +1105,7 @@ func (p *partitionProducer) updateUncompressedPayload(sr 
*sendRequest) error {
        if sr.msg.Value != nil {
                if sr.schema == nil {
                        p.log.Errorf("Schema encode message failed %s", 
sr.msg.Value)
-                       return newError(SchemaFailure, "set schema value 
without setting schema")
+                       return joinErrors(ErrSchema, fmt.Errorf("set schema 
value without setting schema"))
                }
 
                // payload and schema are mutually exclusive
@@ -1105,7 +1113,7 @@ func (p *partitionProducer) updateUncompressedPayload(sr 
*sendRequest) error {
                schemaPayload, err := sr.schema.Encode(sr.msg.Value)
                if err != nil {
                        p.log.WithError(err).Errorf("Schema encode message 
failed %s", sr.msg.Value)
-                       return newError(SchemaFailure, err.Error())
+                       return joinErrors(ErrSchema, err)
                }
 
                sr.uncompressedPayload = schemaPayload
@@ -1160,11 +1168,11 @@ func (p *partitionProducer) updateChunkInfo(sr 
*sendRequest) error {
 
        // if msg is too large and chunking is disabled
        if checkSize > int64(sr.maxMessageSize) && !p.options.EnableChunking {
-               p.log.WithError(errMessageTooLarge).
+               p.log.WithError(ErrMessageTooLarge).
                        WithField("size", checkSize).
                        WithField("properties", sr.msg.Properties).
                        Errorf("MaxMessageSize %d", sr.maxMessageSize)
-               return errMessageTooLarge
+               return ErrMessageTooLarge
        }
 
        if sr.sendAsBatch || !p.options.EnableChunking {
@@ -1173,11 +1181,11 @@ func (p *partitionProducer) updateChunkInfo(sr 
*sendRequest) error {
        } else {
                sr.payloadChunkSize = int(sr.maxMessageSize) - proto.Size(sr.mm)
                if sr.payloadChunkSize <= 0 {
-                       p.log.WithError(errMetaTooLarge).
+                       p.log.WithError(ErrMetaTooLarge).
                                WithField("metadata size", proto.Size(sr.mm)).
                                WithField("properties", sr.msg.Properties).
                                Errorf("MaxMessageSize %d", 
int(p._getConn().GetMaxMessageSize()))
-                       return errMetaTooLarge
+                       return ErrMetaTooLarge
                }
                // set ChunkMaxMessageSize
                if p.options.ChunkMaxMessageSize != 0 {
@@ -1220,7 +1228,7 @@ func (p *partitionProducer) internalSendAsync(
        }
 
        if p.getProducerState() != producerReady {
-               sr.done(nil, errProducerClosed)
+               sr.done(nil, ErrProducerClosed)
                return
        }
 
@@ -1333,7 +1341,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt)
 func (p *partitionProducer) internalClose(req *closeProducer) {
        defer close(req.doneCh)
 
-       p.doClose(errProducerClosed)
+       p.doClose(ErrProducerClosed)
 }
 
 func (p *partitionProducer) doClose(reason error) {
@@ -1508,11 +1516,11 @@ func (sr *sendRequest) done(msgID MessageID, err error) 
{
                        WithField("properties", sr.msg.Properties)
        }
 
-       if errors.Is(err, errSendTimeout) {
+       if errors.Is(err, ErrSendTimeout) {
                sr.producer.metrics.PublishErrorsTimeout.Inc()
        }
 
-       if errors.Is(err, errMessageTooLarge) {
+       if errors.Is(err, ErrMessageTooLarge) {
                sr.producer.metrics.PublishErrorsMsgTooLarge.Inc()
        }
 
@@ -1554,7 +1562,7 @@ func (p *partitionProducer) reserveSemaphore(sr 
*sendRequest) error {
        for i := 0; i < sr.totalChunks; i++ {
                if p.blockIfQueueFull() {
                        if !p.publishSemaphore.Acquire(sr.ctx) {
-                               return errContextExpired
+                               return ErrContextExpired
                        }
 
                        // update sr.semaphore and sr.reservedSemaphore here so 
that we can release semaphore in the case
@@ -1564,7 +1572,7 @@ func (p *partitionProducer) reserveSemaphore(sr 
*sendRequest) error {
                        p.metrics.MessagesPending.Inc()
                } else {
                        if !p.publishSemaphore.TryAcquire() {
-                               return errSendQueueIsFull
+                               return ErrSendQueueIsFull
                        }
 
                        // update sr.semaphore and sr.reservedSemaphore here so 
that we can release semaphore in the case
@@ -1586,11 +1594,11 @@ func (p *partitionProducer) reserveMem(sr *sendRequest) 
error {
 
        if p.blockIfQueueFull() {
                if !p.client.memLimit.ReserveMemory(sr.ctx, requiredMem) {
-                       return errContextExpired
+                       return ErrContextExpired
                }
        } else {
                if !p.client.memLimit.TryReserveMemory(requiredMem) {
-                       return errMemoryBufferIsFull
+                       return ErrMemoryBufferIsFull
                }
        }
 
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 0f890692..0d74cdee 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -1061,8 +1061,8 @@ func TestMaxMessageSize(t *testing.T) {
        assert.NoError(t, err)
        defer client.Close()
 
-       // Need to set BatchingMaxSize > serverMaxMessageSize to avoid 
errMessageTooLarge
-       // being masked by an earlier errFailAddToBatch
+       // Need to set BatchingMaxSize > serverMaxMessageSize to avoid 
ErrMessageTooLarge
+       // being masked by an earlier ErrFailAddToBatch
        producer, err := client.CreateProducer(ProducerOptions{
                Topic:           newTopicName(),
                BatchingMaxSize: uint(2 * serverMaxMessageSize),
@@ -1088,7 +1088,7 @@ func TestMaxMessageSize(t *testing.T) {
        // So when bias <= 0, the uncompressed payload will not exceed 
maxMessageSize,
        // but encryptedPayloadSize exceeds maxMessageSize, Send() will return 
an internal error.
        // When bias = 1, the first check of maxMessageSize (for uncompressed 
payload) is valid,
-       // Send() will return errMessageTooLarge
+       // Send() will return ErrMessageTooLarge
        for bias := -1; bias <= 1; bias++ {
                payload := make([]byte, serverMaxMessageSize+bias)
                ID, err := producer.Send(context.Background(), &ProducerMessage{
@@ -1098,7 +1098,7 @@ func TestMaxMessageSize(t *testing.T) {
                        assert.Equal(t, true, errors.Is(err, 
internal.ErrExceedMaxMessageSize))
                        assert.Nil(t, ID)
                } else {
-                       assert.Equal(t, errMessageTooLarge, err)
+                       assert.True(t, errors.Is(err, ErrMessageTooLarge))
                }
        }
 
@@ -1111,7 +1111,7 @@ func TestMaxMessageSize(t *testing.T) {
                        assert.Equal(t, true, errors.Is(err, 
internal.ErrExceedMaxMessageSize))
                        assert.Nil(t, ID)
                } else {
-                       assert.Equal(t, errMessageTooLarge, err)
+                       assert.True(t, errors.Is(err, ErrMessageTooLarge))
                }
        }
 }
@@ -1190,8 +1190,7 @@ func TestTopicTermination(t *testing.T) {
                                Payload: make([]byte, 1024),
                        })
                        if err != nil {
-                               e := err.(*Error)
-                               if e.result == TopicTerminated || err == 
errProducerClosed {
+                               if errors.Is(err, ErrTopicTerminated) || 
errors.Is(err, ErrProducerClosed) {
                                        terminatedChan <- true
                                } else {
                                        terminatedChan <- false
@@ -2348,7 +2347,7 @@ func TestFailPendingMessageWithClose(t *testing.T) {
                        Payload: make([]byte, 1024),
                }, func(id MessageID, message *ProducerMessage, e error) {
                        if e != nil {
-                               assert.Equal(t, errProducerClosed, e)
+                               assert.True(t, errors.Is(e, ErrProducerClosed))
                        }
                })
        }

Reply via email to