This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 2e1f9c4 fix: use newError to build return error (#471)
2e1f9c4 is described below
commit 2e1f9c43d853774e69141ce6541bac5ff305fa34
Author: jony montana <[email protected]>
AuthorDate: Fri Mar 5 17:43:35 2021 +0800
fix: use newError to build return error (#471)
Signed-off-by: jonyhy96 <[email protected]>
Fixes #444
### Motivation
For better error assertion in client side.
### Modifications
Fix newError and use newError to build return error in most case.
---
pulsar/batcher_builder.go | 4 +-
pulsar/client_impl.go | 13 ++-
pulsar/client_impl_test.go | 2 +-
pulsar/consumer_impl.go | 13 ++-
pulsar/consumer_multitopic.go | 9 +-
pulsar/consumer_regex.go | 9 +-
pulsar/consumer_test.go | 2 +-
pulsar/dlq_router.go | 5 +-
pulsar/error.go | 193 ++++++++++++++++++++++++++++++------------
pulsar/producer_impl.go | 2 +-
pulsar/producer_partition.go | 11 ++-
pulsar/producer_test.go | 2 +-
pulsar/reader_impl.go | 8 +-
pulsar/retry_router.go | 5 +-
pulsar/schema.go | 13 ++-
15 files changed, 184 insertions(+), 107 deletions(-)
diff --git a/pulsar/batcher_builder.go b/pulsar/batcher_builder.go
index caefa8d..05e059b 100644
--- a/pulsar/batcher_builder.go
+++ b/pulsar/batcher_builder.go
@@ -18,8 +18,6 @@
package pulsar
import (
- "errors"
-
"github.com/apache/pulsar-client-go/pulsar/internal"
)
@@ -39,6 +37,6 @@ func GetBatcherBuilderProvider(typ BatcherBuilderType) (
case KeyBasedBatchBuilder:
return internal.NewKeyBasedBatchBuilder, nil
default:
- return nil, errors.New("unsupported batcher builder provider
type")
+ return nil, newError(InvalidBatchBuilderType, "unsupported
batcher builder provider type")
}
}
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index 7acbaff..5882dd4 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -18,7 +18,6 @@
package pulsar
import (
- "errors"
"fmt"
"net/url"
"time"
@@ -57,13 +56,13 @@ func newClient(options ClientOptions) (Client, error) {
}
if options.URL == "" {
- return nil, newError(ResultInvalidConfiguration, "URL is
required for client")
+ return nil, newError(InvalidConfiguration, "URL is required for
client")
}
url, err := url.Parse(options.URL)
if err != nil {
logger.WithError(err).Error("Failed to parse service URL")
- return nil, newError(ResultInvalidConfiguration, "Invalid
service URL")
+ return nil, newError(InvalidConfiguration, "Invalid service
URL")
}
var tlsConfig *internal.TLSOptions
@@ -77,7 +76,7 @@ func newClient(options ClientOptions) (Client, error) {
ValidateHostname: options.TLSValidateHostname,
}
default:
- return nil, newError(ResultInvalidConfiguration,
fmt.Sprintf("Invalid URL scheme '%s'", url.Scheme))
+ return nil, newError(InvalidConfiguration, fmt.Sprintf("Invalid
URL scheme '%s'", url.Scheme))
}
var authProvider auth.Provider
@@ -88,7 +87,7 @@ func newClient(options ClientOptions) (Client, error) {
} else {
authProvider, ok = options.Authentication.(auth.Provider)
if !ok {
- return nil, errors.New("invalid auth provider
interface")
+ return nil, newError(AuthenticationError, "invalid auth
provider interface")
}
}
err = authProvider.Init()
@@ -169,7 +168,7 @@ func (c *client) TopicPartitions(topic string) ([]string,
error) {
}
if r != nil {
if r.Error != nil {
- return nil, newError(ResultLookupError,
r.GetError().String())
+ return nil, newError(LookupError, r.GetError().String())
}
if r.GetPartitions() > 0 {
@@ -201,7 +200,7 @@ func (c *client) namespaceTopics(namespace string)
([]string, error) {
return nil, err
}
if res.Response.Error != nil {
- return []string{}, newError(ResultLookupError,
res.Response.GetError().String())
+ return []string{}, newError(LookupError,
res.Response.GetError().String())
}
return res.Response.GetTopicsOfNamespaceResponse.GetTopics(), nil
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index 96fd68e..661eac2 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -35,7 +35,7 @@ func TestClient(t *testing.T) {
client, err := NewClient(ClientOptions{})
assert.Nil(t, client)
assert.NotNil(t, err)
- assert.Equal(t, ResultInvalidConfiguration, err.(*Error).Result())
+ assert.Equal(t, InvalidConfiguration, err.(*Error).Result())
}
func TestTLSConnectionCAError(t *testing.T) {
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 6379993..d05c495 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -19,7 +19,6 @@ package pulsar
import (
"context"
- "errors"
"fmt"
"math/rand"
"strconv"
@@ -31,8 +30,6 @@ import (
"github.com/apache/pulsar-client-go/pulsar/log"
)
-var ErrConsumerClosed = errors.New("consumer closed")
-
const defaultNackRedeliveryDelay = 1 * time.Minute
type acker interface {
@@ -182,7 +179,7 @@ func newConsumer(client *client, options ConsumerOptions)
(Consumer, error) {
return newRegexConsumer(client, options, tn, pattern,
messageCh, dlq, rlq)
}
- return nil, newError(ResultInvalidTopicName, "topic name is required
for consumer")
+ return nil, newError(InvalidTopicName, "topic name is required for
consumer")
}
func newInternalConsumer(client *client, options ConsumerOptions, topic string,
@@ -382,10 +379,10 @@ func (c *consumer) Receive(ctx context.Context) (message
Message, err error) {
for {
select {
case <-c.closeCh:
- return nil, ErrConsumerClosed
+ return nil, newError(ConsumerClosed, "consumer closed")
case cm, ok := <-c.messageCh:
if !ok {
- return nil, ErrConsumerClosed
+ return nil, newError(ConsumerClosed, "consumer
closed")
}
return cm.Message, nil
case <-ctx.Done():
@@ -515,7 +512,7 @@ func (c *consumer) Seek(msgID MessageID) error {
defer c.Unlock()
if len(c.consumers) > 1 {
- return errors.New("for partition topic, seek command should
perform on the individual partitions")
+ return newError(SeekFailed, "for partition topic, seek command
should perform on the individual partitions")
}
mid, ok := c.messageID(msgID)
@@ -530,7 +527,7 @@ func (c *consumer) SeekByTime(time time.Time) error {
c.Lock()
defer c.Unlock()
if len(c.consumers) > 1 {
- return errors.New("for partition topic, seek command should
perform on the individual partitions")
+ return newError(SeekFailed, "for partition topic, seek command
should perform on the individual partitions")
}
return c.consumers[0].SeekByTime(time)
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 479a12a..dc4ad7b 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -19,7 +19,6 @@ package pulsar
import (
"context"
- "errors"
"fmt"
"sync"
"time"
@@ -98,10 +97,10 @@ func (c *multiTopicConsumer) Receive(ctx context.Context)
(message Message, err
for {
select {
case <-c.closeCh:
- return nil, ErrConsumerClosed
+ return nil, newError(ConsumerClosed, "consumer closed")
case cm, ok := <-c.messageCh:
if !ok {
- return nil, ErrConsumerClosed
+ return nil, newError(ConsumerClosed, "consumer
closed")
}
return cm.Message, nil
case <-ctx.Done():
@@ -193,11 +192,11 @@ func (c *multiTopicConsumer) Close() {
}
func (c *multiTopicConsumer) Seek(msgID MessageID) error {
- return errors.New("seek command not allowed for multi topic consumer")
+ return newError(SeekFailed, "seek command not allowed for multi topic
consumer")
}
func (c *multiTopicConsumer) SeekByTime(time time.Time) error {
- return errors.New("seek command not allowed for multi topic consumer")
+ return newError(SeekFailed, "seek command not allowed for multi topic
consumer")
}
// Name returns the name of consumer.
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index aaa786a..7072370 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -19,7 +19,6 @@ package pulsar
import (
"context"
- "errors"
"fmt"
"regexp"
"strings"
@@ -141,10 +140,10 @@ func (c *regexConsumer) Receive(ctx context.Context)
(message Message, err error
for {
select {
case <-c.closeCh:
- return nil, ErrConsumerClosed
+ return nil, newError(ConsumerClosed, "consumer closed")
case cm, ok := <-c.messageCh:
if !ok {
- return nil, ErrConsumerClosed
+ return nil, newError(ConsumerClosed, "consumer
closed")
}
return cm.Message, nil
case <-ctx.Done():
@@ -224,11 +223,11 @@ func (c *regexConsumer) Close() {
}
func (c *regexConsumer) Seek(msgID MessageID) error {
- return errors.New("seek command not allowed for regex consumer")
+ return newError(SeekFailed, "seek command not allowed for regex
consumer")
}
func (c *regexConsumer) SeekByTime(time time.Time) error {
- return errors.New("seek command not allowed for regex consumer")
+ return newError(SeekFailed, "seek command not allowed for regex
consumer")
}
// Name returns the name of consumer.
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index f0b27c6..f512b00 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -948,7 +948,7 @@ func TestConsumerReceiveErrAfterClose(t *testing.T) {
case <-time.After(200 * time.Millisecond):
case err = <-errorCh:
}
- assert.Equal(t, ErrConsumerClosed, err)
+ assert.Equal(t, ConsumerClosed, err.(*Error).result)
}
func TestDLQ(t *testing.T) {
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 40761e2..c858946 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -19,7 +19,6 @@ package pulsar
import (
"context"
- "errors"
"time"
"github.com/apache/pulsar-client-go/pulsar/internal"
@@ -44,11 +43,11 @@ func newDlqRouter(client Client, policy *DLQPolicy, logger
log.Logger) (*dlqRout
if policy != nil {
if policy.MaxDeliveries <= 0 {
- return nil, errors.New("DLQPolicy.MaxDeliveries needs
to be > 0")
+ return nil, newError(InvalidConfiguration,
"DLQPolicy.MaxDeliveries needs to be > 0")
}
if policy.DeadLetterTopic == "" {
- return nil, errors.New("DLQPolicy.Topic needs to be set
to a valid topic name")
+ return nil, newError(InvalidConfiguration,
"DLQPolicy.Topic needs to be set to a valid topic name")
}
r.messageCh = make(chan ConsumerMessage)
diff --git a/pulsar/error.go b/pulsar/error.go
index b98ce88..60a832b 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -23,54 +23,82 @@ import "fmt"
type Result int
const (
- // ResultOk means no errors
- ResultOk Result = iota
- // ResultUnknownError means unknown error happened on broker
- ResultUnknownError
- // ResultInvalidConfiguration means invalid configuration
- ResultInvalidConfiguration
- // ResultTimeoutError means operation timed out
- ResultTimeoutError
- // ResultLookupError means broker lookup failed
- ResultLookupError
- // ResultInvalidTopicName means invalid topic name
- ResultInvalidTopicName
- // ResultConnectError means failed to connect to broker
- ResultConnectError
-
+ // Ok means no errors
+ Ok Result = iota
+ // UnknownError means unknown error happened on broker
+ UnknownError
+ // InvalidConfiguration means invalid configuration
+ InvalidConfiguration
+ // TimeoutError means operation timed out
+ TimeoutError
+ //LookupError means broker lookup failed
+ LookupError
+ // ConnectError means failed to connect to broker
+ ConnectError
// ReadError means failed to read from socket
- //ReadError Result = 6
+ ReadError
// AuthenticationError means authentication failed on broker
- //AuthenticationError Result = 7
- //AuthorizationError Result = 8
- //ErrorGettingAuthenticationData Result = 9 // Client cannot find
authorization data
- //BrokerMetadataError Result = 10 // Broker failed in
updating metadata
- //BrokerPersistenceError Result = 11 // Broker failed to
persist entry
- //ChecksumError Result = 12 // Corrupt message
checksum failure
+ AuthenticationError
+ // AuthorizationError client is not authorized to create
producer/consumer
+ AuthorizationError
+ // ErrorGettingAuthenticationData client cannot find authorization data
+ ErrorGettingAuthenticationData
+ // BrokerMetadataError broker failed in updating metadata
+ BrokerMetadataError
+ // BrokerPersistenceError broker failed to persist entry
+ BrokerPersistenceError
+ // ChecksumError corrupt message checksum failure
+ ChecksumError
// ConsumerBusy means Exclusive consumer is already connected
- ConsumerBusy Result = 13
- //NotConnectedError Result = 14 // Producer/Consumer is
not currently connected to broker
- //AlreadyClosedError Result = 15 // Producer/Consumer is
already closed and not accepting any operation
- //InvalidMessage Result = 16 // Error in publishing an
already used message
- //ConsumerNotInitialized Result = 17 // Consumer is not
initialized
- //ProducerNotInitialized Result = 18 // Producer is not
initialized
- //TooManyLookupRequestException Result = 19 // Too Many concurrent
LookupRequest
- // InvalidUrl means Client Initialized with Invalid Broker Url (VIP Url
passed to Client Constructor)
- //InvalidUrl Result = 21
+ ConsumerBusy
+ // NotConnectedError producer/consumer is not currently connected to
broker
+ NotConnectedError
+ // AlreadyClosedError producer/consumer is already closed and not
accepting any operation
+ AlreadyClosedError
+ // InvalidMessage error in publishing an already used message
+ InvalidMessage
+ // ConsumerNotInitialized consumer is not initialized
+ ConsumerNotInitialized
+ // ProducerNotInitialized producer is not initialized
+ ProducerNotInitialized
+ // TooManyLookupRequestException too many concurrent LookupRequest
+ TooManyLookupRequestException
+ // InvalidTopicName means invalid topic name
+ InvalidTopicName
+ // InvalidURL means Client Initialized with Invalid Broker Url (VIP Url
passed to Client Constructor)
+ InvalidURL
// ServiceUnitNotReady unloaded between client did lookup and
producer/consumer got created
- //ServiceUnitNotReady Result = 22
- //OperationNotSupported Result = 23
- //ProducerBlockedQuotaExceededError Result = 24 // Producer is
blocked
- //ProducerBlockedQuotaExceededException Result = 25 // Producer is
getting exception
- //ProducerQueueIsFull Result = 26 // Producer queue
is full
- //MessageTooBig Result = 27 // Trying to send a
messages exceeding the max size
- TopicNotFound Result = 28 // Topic not found
- SubscriptionNotFound Result = 29 // Subscription not found
- //ConsumerNotFound Result = 30 // Consumer not
found
+ ServiceUnitNotReady
+ // OperationNotSupported operation not supported
+ OperationNotSupported
+ // ProducerBlockedQuotaExceededError producer is blocked
+ ProducerBlockedQuotaExceededError
+ // ProducerBlockedQuotaExceededException producer is getting exception
+ ProducerBlockedQuotaExceededException
+ // ProducerQueueIsFull producer queue is full
+ ProducerQueueIsFull
+ // MessageTooBig trying to send a messages exceeding the max size
+ MessageTooBig
+ // TopicNotFound topic not found
+ TopicNotFound
+ // SubscriptionNotFound subscription not found
+ SubscriptionNotFound
+ // ConsumerNotFound consumer not found
+ ConsumerNotFound
// UnsupportedVersionError when an older client/version doesn't support
a required feature
- //UnsupportedVersionError Result = 31
- //TopicTerminated Result = 32 // Topic was
already terminated
- //CryptoError Result = 33 // Error when
crypto operation fails
+ UnsupportedVersionError
+ // TopicTerminated topic was already terminated
+ TopicTerminated
+ // CryptoError error when crypto operation fails
+ CryptoError
+ // ConsumerClosed means consumer already been closed
+ ConsumerClosed
+ // InvalidBatchBuilderType invalid batch builder type
+ InvalidBatchBuilderType
+ // AddToBatchFailed failed to add sendRequest to batchBuilder
+ AddToBatchFailed
+ // SeekFailed seek failed
+ SeekFailed
)
// Error implement error interface, composed of two parts: msg and result.
@@ -79,6 +107,7 @@ type Error struct {
result Result
}
+// Result get error's original result.
func (e *Error) Result() Result {
return e.result
}
@@ -96,22 +125,82 @@ func newError(result Result, msg string) error {
func getResultStr(r Result) string {
switch r {
- case ResultOk:
+ case Ok:
return "OK"
- case ResultUnknownError:
- return "Unknown error"
- case ResultInvalidConfiguration:
+ case UnknownError:
+ return "UnknownError"
+ case InvalidConfiguration:
return "InvalidConfiguration"
- case ResultTimeoutError:
+ case TimeoutError:
return "TimeoutError"
- case ResultLookupError:
+ case LookupError:
return "LookupError"
- case ResultInvalidTopicName:
- return "InvalidTopicName"
- case ResultConnectError:
+ case ConnectError:
return "ConnectError"
+ case ReadError:
+ return "ReadError"
+ case AuthenticationError:
+ return "AuthenticationError"
+ case AuthorizationError:
+ return "AuthorizationError"
+ case ErrorGettingAuthenticationData:
+ return "ErrorGettingAuthenticationData"
+ case BrokerMetadataError:
+ return "BrokerMetadataError"
+ case BrokerPersistenceError:
+ return "BrokerPersistenceError"
+ case ChecksumError:
+ return "ChecksumError"
case ConsumerBusy:
return "ConsumerBusy"
+ case NotConnectedError:
+ return "NotConnectedError"
+ case AlreadyClosedError:
+ return "AlreadyClosedError"
+ case InvalidMessage:
+ return "InvalidMessage"
+ case ConsumerNotInitialized:
+ return "ConsumerNotInitialized"
+ case ProducerNotInitialized:
+ return "ProducerNotInitialized"
+ case TooManyLookupRequestException:
+ return "TooManyLookupRequestException"
+ case InvalidTopicName:
+ return "InvalidTopicName"
+ case InvalidURL:
+ return "InvalidURL"
+ case ServiceUnitNotReady:
+ return "ServiceUnitNotReady"
+ case OperationNotSupported:
+ return "OperationNotSupported"
+ case ProducerBlockedQuotaExceededError:
+ return "ProducerBlockedQuotaExceededError"
+ case ProducerBlockedQuotaExceededException:
+ return "ProducerBlockedQuotaExceededException"
+ case ProducerQueueIsFull:
+ return "ProducerQueueIsFull"
+ case MessageTooBig:
+ return "MessageTooBig"
+ case TopicNotFound:
+ return "TopicNotFound"
+ case SubscriptionNotFound:
+ return "SubscriptionNotFound"
+ case ConsumerNotFound:
+ return "ConsumerNotFound"
+ case UnsupportedVersionError:
+ return "UnsupportedVersionError"
+ case TopicTerminated:
+ return "TopicTerminated"
+ case CryptoError:
+ return "CryptoError"
+ case ConsumerClosed:
+ return "ConsumerClosed"
+ case InvalidBatchBuilderType:
+ return "InvalidBatchBuilderType"
+ case AddToBatchFailed:
+ return "AddToBatchFailed"
+ case SeekFailed:
+ return "SeekFailed"
default:
return fmt.Sprintf("Result(%d)", r)
}
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 73aadb1..bf4d92e 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -72,7 +72,7 @@ func getHashingFunction(s HashingScheme) func(string) uint32 {
func newProducer(client *client, options *ProducerOptions) (*producer, error) {
if options.Topic == "" {
- return nil, newError(ResultInvalidTopicName, "Topic name is
required for producer")
+ return nil, newError(InvalidTopicName, "Topic name is required
for producer")
}
if options.SendTimeout == 0 {
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 34def20..87c97ed 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -19,7 +19,6 @@ package pulsar
import (
"context"
- "errors"
"sync"
"sync/atomic"
"time"
@@ -46,10 +45,10 @@ const (
)
var (
- errFailAddBatch = errors.New("message send failed")
- errSendTimeout = errors.New("message send timeout")
- errSendQueueIsFull = errors.New("producer send queue is full")
- errMessageTooLarge = errors.New("message size exceeds MaxMessageSize")
+ errFailAddToBatch = newError(AddToBatchFailed, "message add to batch
failed")
+ errSendTimeout = newError(TimeoutError, "message send timeout")
+ errSendQueueIsFull = newError(ProducerQueueIsFull, "producer send queue
is full")
+ errMessageTooLarge = newError(MessageTooBig, "message size exceeds
MaxMessageSize")
buffersPool sync.Pool
)
@@ -407,7 +406,7 @@ func (p *partitionProducer) internalSend(request
*sendRequest) {
if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator,
payload, request,
msg.ReplicationClusters, deliverAt); !ok {
p.publishSemaphore.Release()
- request.callback(nil, request.msg, errFailAddBatch)
+ request.callback(nil, request.msg, errFailAddToBatch)
p.log.WithField("size", len(payload)).
WithField("properties", msg.Properties).
Error("unable to add message to batch")
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 9475051..5708cad 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -78,7 +78,7 @@ func TestProducerNoTopic(t *testing.T) {
assert.Nil(t, producer)
assert.NotNil(t, err)
- assert.Equal(t, err.(*Error).Result(), ResultInvalidTopicName)
+ assert.Equal(t, InvalidTopicName, err.(*Error).Result())
}
func TestSimpleProducer(t *testing.T) {
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index d76865e..a019d9c 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -42,11 +42,11 @@ type reader struct {
func newReader(client *client, options ReaderOptions) (Reader, error) {
if options.Topic == "" {
- return nil, newError(ResultInvalidConfiguration, "Topic is
required")
+ return nil, newError(InvalidConfiguration, "Topic is required")
}
if options.StartMessageID == nil {
- return nil, newError(ResultInvalidConfiguration,
"StartMessageID is required")
+ return nil, newError(InvalidConfiguration, "StartMessageID is
required")
}
startMessageID, ok := toTrackingMessageID(options.StartMessageID)
@@ -122,7 +122,7 @@ func (r *reader) Next(ctx context.Context) (Message, error)
{
select {
case cm, ok := <-r.messageCh:
if !ok {
- return nil, ErrConsumerClosed
+ return nil, newError(ConsumerClosed, "consumer
closed")
}
// Acknowledge message immediately because the reader
is based on non-durable subscription. When it reconnects,
@@ -133,7 +133,7 @@ func (r *reader) Next(ctx context.Context) (Message, error)
{
r.pc.AckID(mid)
return cm.Message, nil
}
- return nil, fmt.Errorf("invalid message id type %T",
msgID)
+ return nil, newError(InvalidMessage,
fmt.Sprintf("invalid message id type %T", msgID))
case <-ctx.Done():
return nil, ctx.Err()
}
diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go
index 3eb12f4..0e9e6be 100644
--- a/pulsar/retry_router.go
+++ b/pulsar/retry_router.go
@@ -19,7 +19,6 @@ package pulsar
import (
"context"
- "errors"
"time"
"github.com/apache/pulsar-client-go/pulsar/internal"
@@ -61,11 +60,11 @@ func newRetryRouter(client Client, policy *DLQPolicy,
retryEnabled bool, logger
if policy != nil && retryEnabled {
if policy.MaxDeliveries <= 0 {
- return nil, errors.New("DLQPolicy.MaxDeliveries needs
to be > 0")
+ return nil, newError(InvalidConfiguration,
"DLQPolicy.MaxDeliveries needs to be > 0")
}
if policy.RetryLetterTopic == "" {
- return nil, errors.New("DLQPolicy.RetryLetterTopic
needs to be set to a valid topic name")
+ return nil, newError(InvalidConfiguration,
"DLQPolicy.RetryLetterTopic needs to be set to a valid topic name")
}
r.messageCh = make(chan RetryMessage)
diff --git a/pulsar/schema.go b/pulsar/schema.go
index 7885603..8021ffe 100644
--- a/pulsar/schema.go
+++ b/pulsar/schema.go
@@ -20,7 +20,6 @@ package pulsar
import (
"bytes"
"encoding/json"
- "errors"
"reflect"
"unsafe"
@@ -311,7 +310,7 @@ func (is8 *Int8Schema) Decode(data []byte, v interface{})
error {
func (is8 *Int8Schema) Validate(message []byte) error {
if len(message) != 1 {
- return errors.New("size of data received by Int8Schema is not
1")
+ return newError(InvalidMessage, "size of data received by
Int8Schema is not 1")
}
return nil
}
@@ -346,7 +345,7 @@ func (is16 *Int16Schema) Decode(data []byte, v interface{})
error {
func (is16 *Int16Schema) Validate(message []byte) error {
if len(message) != 2 {
- return errors.New("size of data received by Int16Schema is not
2")
+ return newError(InvalidMessage, "size of data received by
Int16Schema is not 2")
}
return nil
}
@@ -381,7 +380,7 @@ func (is32 *Int32Schema) Decode(data []byte, v interface{})
error {
func (is32 *Int32Schema) Validate(message []byte) error {
if len(message) != 4 {
- return errors.New("size of data received by Int32Schema is not
4")
+ return newError(InvalidMessage, "size of data received by
Int32Schema is not 4")
}
return nil
}
@@ -416,7 +415,7 @@ func (is64 *Int64Schema) Decode(data []byte, v interface{})
error {
func (is64 *Int64Schema) Validate(message []byte) error {
if len(message) != 8 {
- return errors.New("size of data received by Int64Schema is not
8")
+ return newError(InvalidMessage, "size of data received by
Int64Schema is not 8")
}
return nil
}
@@ -454,7 +453,7 @@ func (fs *FloatSchema) Decode(data []byte, v interface{})
error {
func (fs *FloatSchema) Validate(message []byte) error {
if len(message) != 4 {
- return errors.New("size of data received by FloatSchema is not
4")
+ return newError(InvalidMessage, "size of data received by
FloatSchema is not 4")
}
return nil
}
@@ -492,7 +491,7 @@ func (ds *DoubleSchema) Decode(data []byte, v interface{})
error {
func (ds *DoubleSchema) Validate(message []byte) error {
if len(message) != 8 {
- return errors.New("size of data received by DoubleSchema is not
8")
+ return newError(InvalidMessage, "size of data received by
DoubleSchema is not 8")
}
return nil
}