This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e1f9c4  fix: use newError to build return error (#471)
2e1f9c4 is described below

commit 2e1f9c43d853774e69141ce6541bac5ff305fa34
Author: jony montana <[email protected]>
AuthorDate: Fri Mar 5 17:43:35 2021 +0800

    fix: use newError to build return error (#471)
    
    Signed-off-by: jonyhy96 <[email protected]>
    
    Fixes #444
    
    ### Motivation
    
    For better error assertion in client side.
    
    ### Modifications
    
    Fix newError and use newError to build return error in most case.
---
 pulsar/batcher_builder.go     |   4 +-
 pulsar/client_impl.go         |  13 ++-
 pulsar/client_impl_test.go    |   2 +-
 pulsar/consumer_impl.go       |  13 ++-
 pulsar/consumer_multitopic.go |   9 +-
 pulsar/consumer_regex.go      |   9 +-
 pulsar/consumer_test.go       |   2 +-
 pulsar/dlq_router.go          |   5 +-
 pulsar/error.go               | 193 ++++++++++++++++++++++++++++++------------
 pulsar/producer_impl.go       |   2 +-
 pulsar/producer_partition.go  |  11 ++-
 pulsar/producer_test.go       |   2 +-
 pulsar/reader_impl.go         |   8 +-
 pulsar/retry_router.go        |   5 +-
 pulsar/schema.go              |  13 ++-
 15 files changed, 184 insertions(+), 107 deletions(-)

diff --git a/pulsar/batcher_builder.go b/pulsar/batcher_builder.go
index caefa8d..05e059b 100644
--- a/pulsar/batcher_builder.go
+++ b/pulsar/batcher_builder.go
@@ -18,8 +18,6 @@
 package pulsar
 
 import (
-       "errors"
-
        "github.com/apache/pulsar-client-go/pulsar/internal"
 )
 
@@ -39,6 +37,6 @@ func GetBatcherBuilderProvider(typ BatcherBuilderType) (
        case KeyBasedBatchBuilder:
                return internal.NewKeyBasedBatchBuilder, nil
        default:
-               return nil, errors.New("unsupported batcher builder provider 
type")
+               return nil, newError(InvalidBatchBuilderType, "unsupported 
batcher builder provider type")
        }
 }
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 7acbaff..5882dd4 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -18,7 +18,6 @@
 package pulsar
 
 import (
-       "errors"
        "fmt"
        "net/url"
        "time"
@@ -57,13 +56,13 @@ func newClient(options ClientOptions) (Client, error) {
        }
 
        if options.URL == "" {
-               return nil, newError(ResultInvalidConfiguration, "URL is 
required for client")
+               return nil, newError(InvalidConfiguration, "URL is required for 
client")
        }
 
        url, err := url.Parse(options.URL)
        if err != nil {
                logger.WithError(err).Error("Failed to parse service URL")
-               return nil, newError(ResultInvalidConfiguration, "Invalid 
service URL")
+               return nil, newError(InvalidConfiguration, "Invalid service 
URL")
        }
 
        var tlsConfig *internal.TLSOptions
@@ -77,7 +76,7 @@ func newClient(options ClientOptions) (Client, error) {
                        ValidateHostname:        options.TLSValidateHostname,
                }
        default:
-               return nil, newError(ResultInvalidConfiguration, 
fmt.Sprintf("Invalid URL scheme '%s'", url.Scheme))
+               return nil, newError(InvalidConfiguration, fmt.Sprintf("Invalid 
URL scheme '%s'", url.Scheme))
        }
 
        var authProvider auth.Provider
@@ -88,7 +87,7 @@ func newClient(options ClientOptions) (Client, error) {
        } else {
                authProvider, ok = options.Authentication.(auth.Provider)
                if !ok {
-                       return nil, errors.New("invalid auth provider 
interface")
+                       return nil, newError(AuthenticationError, "invalid auth 
provider interface")
                }
        }
        err = authProvider.Init()
@@ -169,7 +168,7 @@ func (c *client) TopicPartitions(topic string) ([]string, 
error) {
        }
        if r != nil {
                if r.Error != nil {
-                       return nil, newError(ResultLookupError, 
r.GetError().String())
+                       return nil, newError(LookupError, r.GetError().String())
                }
 
                if r.GetPartitions() > 0 {
@@ -201,7 +200,7 @@ func (c *client) namespaceTopics(namespace string) 
([]string, error) {
                return nil, err
        }
        if res.Response.Error != nil {
-               return []string{}, newError(ResultLookupError, 
res.Response.GetError().String())
+               return []string{}, newError(LookupError, 
res.Response.GetError().String())
        }
 
        return res.Response.GetTopicsOfNamespaceResponse.GetTopics(), nil
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index 96fd68e..661eac2 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -35,7 +35,7 @@ func TestClient(t *testing.T) {
        client, err := NewClient(ClientOptions{})
        assert.Nil(t, client)
        assert.NotNil(t, err)
-       assert.Equal(t, ResultInvalidConfiguration, err.(*Error).Result())
+       assert.Equal(t, InvalidConfiguration, err.(*Error).Result())
 }
 
 func TestTLSConnectionCAError(t *testing.T) {
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 6379993..d05c495 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -19,7 +19,6 @@ package pulsar
 
 import (
        "context"
-       "errors"
        "fmt"
        "math/rand"
        "strconv"
@@ -31,8 +30,6 @@ import (
        "github.com/apache/pulsar-client-go/pulsar/log"
 )
 
-var ErrConsumerClosed = errors.New("consumer closed")
-
 const defaultNackRedeliveryDelay = 1 * time.Minute
 
 type acker interface {
@@ -182,7 +179,7 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                return newRegexConsumer(client, options, tn, pattern, 
messageCh, dlq, rlq)
        }
 
-       return nil, newError(ResultInvalidTopicName, "topic name is required 
for consumer")
+       return nil, newError(InvalidTopicName, "topic name is required for 
consumer")
 }
 
 func newInternalConsumer(client *client, options ConsumerOptions, topic string,
@@ -382,10 +379,10 @@ func (c *consumer) Receive(ctx context.Context) (message 
Message, err error) {
        for {
                select {
                case <-c.closeCh:
-                       return nil, ErrConsumerClosed
+                       return nil, newError(ConsumerClosed, "consumer closed")
                case cm, ok := <-c.messageCh:
                        if !ok {
-                               return nil, ErrConsumerClosed
+                               return nil, newError(ConsumerClosed, "consumer 
closed")
                        }
                        return cm.Message, nil
                case <-ctx.Done():
@@ -515,7 +512,7 @@ func (c *consumer) Seek(msgID MessageID) error {
        defer c.Unlock()
 
        if len(c.consumers) > 1 {
-               return errors.New("for partition topic, seek command should 
perform on the individual partitions")
+               return newError(SeekFailed, "for partition topic, seek command 
should perform on the individual partitions")
        }
 
        mid, ok := c.messageID(msgID)
@@ -530,7 +527,7 @@ func (c *consumer) SeekByTime(time time.Time) error {
        c.Lock()
        defer c.Unlock()
        if len(c.consumers) > 1 {
-               return errors.New("for partition topic, seek command should 
perform on the individual partitions")
+               return newError(SeekFailed, "for partition topic, seek command 
should perform on the individual partitions")
        }
 
        return c.consumers[0].SeekByTime(time)
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 479a12a..dc4ad7b 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -19,7 +19,6 @@ package pulsar
 
 import (
        "context"
-       "errors"
        "fmt"
        "sync"
        "time"
@@ -98,10 +97,10 @@ func (c *multiTopicConsumer) Receive(ctx context.Context) 
(message Message, err
        for {
                select {
                case <-c.closeCh:
-                       return nil, ErrConsumerClosed
+                       return nil, newError(ConsumerClosed, "consumer closed")
                case cm, ok := <-c.messageCh:
                        if !ok {
-                               return nil, ErrConsumerClosed
+                               return nil, newError(ConsumerClosed, "consumer 
closed")
                        }
                        return cm.Message, nil
                case <-ctx.Done():
@@ -193,11 +192,11 @@ func (c *multiTopicConsumer) Close() {
 }
 
 func (c *multiTopicConsumer) Seek(msgID MessageID) error {
-       return errors.New("seek command not allowed for multi topic consumer")
+       return newError(SeekFailed, "seek command not allowed for multi topic 
consumer")
 }
 
 func (c *multiTopicConsumer) SeekByTime(time time.Time) error {
-       return errors.New("seek command not allowed for multi topic consumer")
+       return newError(SeekFailed, "seek command not allowed for multi topic 
consumer")
 }
 
 // Name returns the name of consumer.
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index aaa786a..7072370 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -19,7 +19,6 @@ package pulsar
 
 import (
        "context"
-       "errors"
        "fmt"
        "regexp"
        "strings"
@@ -141,10 +140,10 @@ func (c *regexConsumer) Receive(ctx context.Context) 
(message Message, err error
        for {
                select {
                case <-c.closeCh:
-                       return nil, ErrConsumerClosed
+                       return nil, newError(ConsumerClosed, "consumer closed")
                case cm, ok := <-c.messageCh:
                        if !ok {
-                               return nil, ErrConsumerClosed
+                               return nil, newError(ConsumerClosed, "consumer 
closed")
                        }
                        return cm.Message, nil
                case <-ctx.Done():
@@ -224,11 +223,11 @@ func (c *regexConsumer) Close() {
 }
 
 func (c *regexConsumer) Seek(msgID MessageID) error {
-       return errors.New("seek command not allowed for regex consumer")
+       return newError(SeekFailed, "seek command not allowed for regex 
consumer")
 }
 
 func (c *regexConsumer) SeekByTime(time time.Time) error {
-       return errors.New("seek command not allowed for regex consumer")
+       return newError(SeekFailed, "seek command not allowed for regex 
consumer")
 }
 
 // Name returns the name of consumer.
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index f0b27c6..f512b00 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -948,7 +948,7 @@ func TestConsumerReceiveErrAfterClose(t *testing.T) {
        case <-time.After(200 * time.Millisecond):
        case err = <-errorCh:
        }
-       assert.Equal(t, ErrConsumerClosed, err)
+       assert.Equal(t, ConsumerClosed, err.(*Error).result)
 }
 
 func TestDLQ(t *testing.T) {
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 40761e2..c858946 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -19,7 +19,6 @@ package pulsar
 
 import (
        "context"
-       "errors"
        "time"
 
        "github.com/apache/pulsar-client-go/pulsar/internal"
@@ -44,11 +43,11 @@ func newDlqRouter(client Client, policy *DLQPolicy, logger 
log.Logger) (*dlqRout
 
        if policy != nil {
                if policy.MaxDeliveries <= 0 {
-                       return nil, errors.New("DLQPolicy.MaxDeliveries needs 
to be > 0")
+                       return nil, newError(InvalidConfiguration, 
"DLQPolicy.MaxDeliveries needs to be > 0")
                }
 
                if policy.DeadLetterTopic == "" {
-                       return nil, errors.New("DLQPolicy.Topic needs to be set 
to a valid topic name")
+                       return nil, newError(InvalidConfiguration, 
"DLQPolicy.Topic needs to be set to a valid topic name")
                }
 
                r.messageCh = make(chan ConsumerMessage)
diff --git a/pulsar/error.go b/pulsar/error.go
index b98ce88..60a832b 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -23,54 +23,82 @@ import "fmt"
 type Result int
 
 const (
-       // ResultOk means no errors
-       ResultOk Result = iota
-       // ResultUnknownError means unknown error happened on broker
-       ResultUnknownError
-       // ResultInvalidConfiguration means invalid configuration
-       ResultInvalidConfiguration
-       // ResultTimeoutError means operation timed out
-       ResultTimeoutError
-       // ResultLookupError means broker lookup failed
-       ResultLookupError
-       // ResultInvalidTopicName means invalid topic name
-       ResultInvalidTopicName
-       // ResultConnectError means failed to connect to broker
-       ResultConnectError
-
+       // Ok means no errors
+       Ok Result = iota
+       // UnknownError means unknown error happened on broker
+       UnknownError
+       // InvalidConfiguration means invalid configuration
+       InvalidConfiguration
+       // TimeoutError means operation timed out
+       TimeoutError
+       //LookupError means broker lookup failed
+       LookupError
+       // ConnectError means failed to connect to broker
+       ConnectError
        // ReadError means failed to read from socket
-       //ReadError                      Result = 6
+       ReadError
        // AuthenticationError means authentication failed on broker
-       //AuthenticationError            Result = 7
-       //AuthorizationError             Result = 8
-       //ErrorGettingAuthenticationData Result = 9  // Client cannot find 
authorization data
-       //BrokerMetadataError            Result = 10 // Broker failed in 
updating metadata
-       //BrokerPersistenceError         Result = 11 // Broker failed to 
persist entry
-       //ChecksumError                  Result = 12 // Corrupt message 
checksum failure
+       AuthenticationError
+       // AuthorizationError client is not authorized to create 
producer/consumer
+       AuthorizationError
+       // ErrorGettingAuthenticationData client cannot find authorization data
+       ErrorGettingAuthenticationData
+       // BrokerMetadataError broker failed in updating metadata
+       BrokerMetadataError
+       // BrokerPersistenceError broker failed to persist entry
+       BrokerPersistenceError
+       // ChecksumError corrupt message checksum failure
+       ChecksumError
        // ConsumerBusy means Exclusive consumer is already connected
-       ConsumerBusy Result = 13
-       //NotConnectedError              Result = 14 // Producer/Consumer is 
not currently connected to broker
-       //AlreadyClosedError             Result = 15 // Producer/Consumer is 
already closed and not accepting any operation
-       //InvalidMessage                 Result = 16 // Error in publishing an 
already used message
-       //ConsumerNotInitialized         Result = 17 // Consumer is not 
initialized
-       //ProducerNotInitialized         Result = 18 // Producer is not 
initialized
-       //TooManyLookupRequestException  Result = 19 // Too Many concurrent 
LookupRequest
-       // InvalidUrl means Client Initialized with Invalid Broker Url (VIP Url 
passed to Client Constructor)
-       //InvalidUrl                            Result = 21
+       ConsumerBusy
+       // NotConnectedError producer/consumer is not currently connected to 
broker
+       NotConnectedError
+       // AlreadyClosedError producer/consumer is already closed and not 
accepting any operation
+       AlreadyClosedError
+       // InvalidMessage error in publishing an already used message
+       InvalidMessage
+       // ConsumerNotInitialized consumer is not initialized
+       ConsumerNotInitialized
+       // ProducerNotInitialized producer is not initialized
+       ProducerNotInitialized
+       // TooManyLookupRequestException too many concurrent LookupRequest
+       TooManyLookupRequestException
+       // InvalidTopicName means invalid topic name
+       InvalidTopicName
+       // InvalidURL means Client Initialized with Invalid Broker Url (VIP Url 
passed to Client Constructor)
+       InvalidURL
        // ServiceUnitNotReady unloaded between client did lookup and 
producer/consumer got created
-       //ServiceUnitNotReady                   Result = 22
-       //OperationNotSupported                 Result = 23
-       //ProducerBlockedQuotaExceededError     Result = 24 // Producer is 
blocked
-       //ProducerBlockedQuotaExceededException Result = 25 // Producer is 
getting exception
-       //ProducerQueueIsFull                   Result = 26 // Producer queue 
is full
-       //MessageTooBig                         Result = 27 // Trying to send a 
messages exceeding the max size
-       TopicNotFound        Result = 28 // Topic not found
-       SubscriptionNotFound Result = 29 // Subscription not found
-       //ConsumerNotFound                      Result = 30 // Consumer not 
found
+       ServiceUnitNotReady
+       // OperationNotSupported operation not supported
+       OperationNotSupported
+       // ProducerBlockedQuotaExceededError producer is blocked
+       ProducerBlockedQuotaExceededError
+       // ProducerBlockedQuotaExceededException producer is getting exception
+       ProducerBlockedQuotaExceededException
+       // ProducerQueueIsFull producer queue is full
+       ProducerQueueIsFull
+       // MessageTooBig trying to send a messages exceeding the max size
+       MessageTooBig
+       // TopicNotFound topic not found
+       TopicNotFound
+       // SubscriptionNotFound subscription not found
+       SubscriptionNotFound
+       // ConsumerNotFound consumer not found
+       ConsumerNotFound
        // UnsupportedVersionError when an older client/version doesn't support 
a required feature
-       //UnsupportedVersionError               Result = 31
-       //TopicTerminated                       Result = 32 // Topic was 
already terminated
-       //CryptoError                           Result = 33 // Error when 
crypto operation fails
+       UnsupportedVersionError
+       // TopicTerminated topic was already terminated
+       TopicTerminated
+       // CryptoError error when crypto operation fails
+       CryptoError
+       // ConsumerClosed means consumer already been closed
+       ConsumerClosed
+       // InvalidBatchBuilderType invalid batch builder type
+       InvalidBatchBuilderType
+       // AddToBatchFailed failed to add sendRequest to batchBuilder
+       AddToBatchFailed
+       // SeekFailed seek failed
+       SeekFailed
 )
 
 // Error implement error interface, composed of two parts: msg and result.
@@ -79,6 +107,7 @@ type Error struct {
        result Result
 }
 
+// Result get error's original result.
 func (e *Error) Result() Result {
        return e.result
 }
@@ -96,22 +125,82 @@ func newError(result Result, msg string) error {
 
 func getResultStr(r Result) string {
        switch r {
-       case ResultOk:
+       case Ok:
                return "OK"
-       case ResultUnknownError:
-               return "Unknown error"
-       case ResultInvalidConfiguration:
+       case UnknownError:
+               return "UnknownError"
+       case InvalidConfiguration:
                return "InvalidConfiguration"
-       case ResultTimeoutError:
+       case TimeoutError:
                return "TimeoutError"
-       case ResultLookupError:
+       case LookupError:
                return "LookupError"
-       case ResultInvalidTopicName:
-               return "InvalidTopicName"
-       case ResultConnectError:
+       case ConnectError:
                return "ConnectError"
+       case ReadError:
+               return "ReadError"
+       case AuthenticationError:
+               return "AuthenticationError"
+       case AuthorizationError:
+               return "AuthorizationError"
+       case ErrorGettingAuthenticationData:
+               return "ErrorGettingAuthenticationData"
+       case BrokerMetadataError:
+               return "BrokerMetadataError"
+       case BrokerPersistenceError:
+               return "BrokerPersistenceError"
+       case ChecksumError:
+               return "ChecksumError"
        case ConsumerBusy:
                return "ConsumerBusy"
+       case NotConnectedError:
+               return "NotConnectedError"
+       case AlreadyClosedError:
+               return "AlreadyClosedError"
+       case InvalidMessage:
+               return "InvalidMessage"
+       case ConsumerNotInitialized:
+               return "ConsumerNotInitialized"
+       case ProducerNotInitialized:
+               return "ProducerNotInitialized"
+       case TooManyLookupRequestException:
+               return "TooManyLookupRequestException"
+       case InvalidTopicName:
+               return "InvalidTopicName"
+       case InvalidURL:
+               return "InvalidURL"
+       case ServiceUnitNotReady:
+               return "ServiceUnitNotReady"
+       case OperationNotSupported:
+               return "OperationNotSupported"
+       case ProducerBlockedQuotaExceededError:
+               return "ProducerBlockedQuotaExceededError"
+       case ProducerBlockedQuotaExceededException:
+               return "ProducerBlockedQuotaExceededException"
+       case ProducerQueueIsFull:
+               return "ProducerQueueIsFull"
+       case MessageTooBig:
+               return "MessageTooBig"
+       case TopicNotFound:
+               return "TopicNotFound"
+       case SubscriptionNotFound:
+               return "SubscriptionNotFound"
+       case ConsumerNotFound:
+               return "ConsumerNotFound"
+       case UnsupportedVersionError:
+               return "UnsupportedVersionError"
+       case TopicTerminated:
+               return "TopicTerminated"
+       case CryptoError:
+               return "CryptoError"
+       case ConsumerClosed:
+               return "ConsumerClosed"
+       case InvalidBatchBuilderType:
+               return "InvalidBatchBuilderType"
+       case AddToBatchFailed:
+               return "AddToBatchFailed"
+       case SeekFailed:
+               return "SeekFailed"
        default:
                return fmt.Sprintf("Result(%d)", r)
        }
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 73aadb1..bf4d92e 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -72,7 +72,7 @@ func getHashingFunction(s HashingScheme) func(string) uint32 {
 
 func newProducer(client *client, options *ProducerOptions) (*producer, error) {
        if options.Topic == "" {
-               return nil, newError(ResultInvalidTopicName, "Topic name is 
required for producer")
+               return nil, newError(InvalidTopicName, "Topic name is required 
for producer")
        }
 
        if options.SendTimeout == 0 {
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 34def20..87c97ed 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -19,7 +19,6 @@ package pulsar
 
 import (
        "context"
-       "errors"
        "sync"
        "sync/atomic"
        "time"
@@ -46,10 +45,10 @@ const (
 )
 
 var (
-       errFailAddBatch    = errors.New("message send failed")
-       errSendTimeout     = errors.New("message send timeout")
-       errSendQueueIsFull = errors.New("producer send queue is full")
-       errMessageTooLarge = errors.New("message size exceeds MaxMessageSize")
+       errFailAddToBatch  = newError(AddToBatchFailed, "message add to batch 
failed")
+       errSendTimeout     = newError(TimeoutError, "message send timeout")
+       errSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue 
is full")
+       errMessageTooLarge = newError(MessageTooBig, "message size exceeds 
MaxMessageSize")
 
        buffersPool sync.Pool
 )
@@ -407,7 +406,7 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
                if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, 
payload, request,
                        msg.ReplicationClusters, deliverAt); !ok {
                        p.publishSemaphore.Release()
-                       request.callback(nil, request.msg, errFailAddBatch)
+                       request.callback(nil, request.msg, errFailAddToBatch)
                        p.log.WithField("size", len(payload)).
                                WithField("properties", msg.Properties).
                                Error("unable to add message to batch")
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 9475051..5708cad 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -78,7 +78,7 @@ func TestProducerNoTopic(t *testing.T) {
        assert.Nil(t, producer)
        assert.NotNil(t, err)
 
-       assert.Equal(t, err.(*Error).Result(), ResultInvalidTopicName)
+       assert.Equal(t, InvalidTopicName, err.(*Error).Result())
 }
 
 func TestSimpleProducer(t *testing.T) {
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index d76865e..a019d9c 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -42,11 +42,11 @@ type reader struct {
 
 func newReader(client *client, options ReaderOptions) (Reader, error) {
        if options.Topic == "" {
-               return nil, newError(ResultInvalidConfiguration, "Topic is 
required")
+               return nil, newError(InvalidConfiguration, "Topic is required")
        }
 
        if options.StartMessageID == nil {
-               return nil, newError(ResultInvalidConfiguration, 
"StartMessageID is required")
+               return nil, newError(InvalidConfiguration, "StartMessageID is 
required")
        }
 
        startMessageID, ok := toTrackingMessageID(options.StartMessageID)
@@ -122,7 +122,7 @@ func (r *reader) Next(ctx context.Context) (Message, error) 
{
                select {
                case cm, ok := <-r.messageCh:
                        if !ok {
-                               return nil, ErrConsumerClosed
+                               return nil, newError(ConsumerClosed, "consumer 
closed")
                        }
 
                        // Acknowledge message immediately because the reader 
is based on non-durable subscription. When it reconnects,
@@ -133,7 +133,7 @@ func (r *reader) Next(ctx context.Context) (Message, error) 
{
                                r.pc.AckID(mid)
                                return cm.Message, nil
                        }
-                       return nil, fmt.Errorf("invalid message id type %T", 
msgID)
+                       return nil, newError(InvalidMessage, 
fmt.Sprintf("invalid message id type %T", msgID))
                case <-ctx.Done():
                        return nil, ctx.Err()
                }
diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go
index 3eb12f4..0e9e6be 100644
--- a/pulsar/retry_router.go
+++ b/pulsar/retry_router.go
@@ -19,7 +19,6 @@ package pulsar
 
 import (
        "context"
-       "errors"
        "time"
 
        "github.com/apache/pulsar-client-go/pulsar/internal"
@@ -61,11 +60,11 @@ func newRetryRouter(client Client, policy *DLQPolicy, 
retryEnabled bool, logger
 
        if policy != nil && retryEnabled {
                if policy.MaxDeliveries <= 0 {
-                       return nil, errors.New("DLQPolicy.MaxDeliveries needs 
to be > 0")
+                       return nil, newError(InvalidConfiguration, 
"DLQPolicy.MaxDeliveries needs to be > 0")
                }
 
                if policy.RetryLetterTopic == "" {
-                       return nil, errors.New("DLQPolicy.RetryLetterTopic 
needs to be set to a valid topic name")
+                       return nil, newError(InvalidConfiguration, 
"DLQPolicy.RetryLetterTopic needs to be set to a valid topic name")
                }
 
                r.messageCh = make(chan RetryMessage)
diff --git a/pulsar/schema.go b/pulsar/schema.go
index 7885603..8021ffe 100644
--- a/pulsar/schema.go
+++ b/pulsar/schema.go
@@ -20,7 +20,6 @@ package pulsar
 import (
        "bytes"
        "encoding/json"
-       "errors"
        "reflect"
        "unsafe"
 
@@ -311,7 +310,7 @@ func (is8 *Int8Schema) Decode(data []byte, v interface{}) 
error {
 
 func (is8 *Int8Schema) Validate(message []byte) error {
        if len(message) != 1 {
-               return errors.New("size of data received by Int8Schema is not 
1")
+               return newError(InvalidMessage, "size of data received by 
Int8Schema is not 1")
        }
        return nil
 }
@@ -346,7 +345,7 @@ func (is16 *Int16Schema) Decode(data []byte, v interface{}) 
error {
 
 func (is16 *Int16Schema) Validate(message []byte) error {
        if len(message) != 2 {
-               return errors.New("size of data received by Int16Schema is not 
2")
+               return newError(InvalidMessage, "size of data received by 
Int16Schema is not 2")
        }
        return nil
 }
@@ -381,7 +380,7 @@ func (is32 *Int32Schema) Decode(data []byte, v interface{}) 
error {
 
 func (is32 *Int32Schema) Validate(message []byte) error {
        if len(message) != 4 {
-               return errors.New("size of data received by Int32Schema is not 
4")
+               return newError(InvalidMessage, "size of data received by 
Int32Schema is not 4")
        }
        return nil
 }
@@ -416,7 +415,7 @@ func (is64 *Int64Schema) Decode(data []byte, v interface{}) 
error {
 
 func (is64 *Int64Schema) Validate(message []byte) error {
        if len(message) != 8 {
-               return errors.New("size of data received by Int64Schema is not 
8")
+               return newError(InvalidMessage, "size of data received by 
Int64Schema is not 8")
        }
        return nil
 }
@@ -454,7 +453,7 @@ func (fs *FloatSchema) Decode(data []byte, v interface{}) 
error {
 
 func (fs *FloatSchema) Validate(message []byte) error {
        if len(message) != 4 {
-               return errors.New("size of data received by FloatSchema is not 
4")
+               return newError(InvalidMessage, "size of data received by 
FloatSchema is not 4")
        }
        return nil
 }
@@ -492,7 +491,7 @@ func (ds *DoubleSchema) Decode(data []byte, v interface{}) 
error {
 
 func (ds *DoubleSchema) Validate(message []byte) error {
        if len(message) != 8 {
-               return errors.New("size of data received by DoubleSchema is not 
8")
+               return newError(InvalidMessage, "size of data received by 
DoubleSchema is not 8")
        }
        return nil
 }

Reply via email to