This is an automated email from the ASF dual-hosted git repository.

crossoverjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 024e230d fix: wrap errors using %w to preserve context (#1321)
024e230d is described below

commit 024e230d32fc6a0d5586b8c8bc6b34a208b9e9d0
Author: Eugene R. <[email protected]>
AuthorDate: Thu Dec 19 04:25:08 2024 +0200

    fix: wrap errors using %w to preserve context (#1321)
    
    * fix: wrap errors using %w to preserve context
    
    * move the consumer state check
---
 oauth2/auth.go                               |  2 +-
 oauth2/cache/cache.go                        | 10 ++++----
 oauth2/store/keyring.go                      | 13 ++++++-----
 oauth2/store/store.go                        |  2 +-
 pulsar/consumer_partition.go                 | 20 ++++++++--------
 pulsar/crypto/default_message_crypto.go      | 14 ++++++------
 pulsar/internal/commands.go                  |  2 +-
 pulsar/internal/connection.go                |  2 +-
 pulsar/internal/crypto/producer_encryptor.go |  6 ++---
 pulsar/primitiveSerDe.go                     |  4 ++--
 pulsar/producer_impl.go                      |  2 +-
 pulsar/table_view_impl.go                    | 34 ++++++++++++++--------------
 12 files changed, 56 insertions(+), 55 deletions(-)

diff --git a/oauth2/auth.go b/oauth2/auth.go
index 9f4293b5..a4e72d2f 100644
--- a/oauth2/auth.go
+++ b/oauth2/auth.go
@@ -112,7 +112,7 @@ func ExtractUserName(token oauth2.Token) (string, error) {
        p := jwt.Parser{}
        claims := jwt.MapClaims{}
        if _, _, err := p.ParseUnverified(token.AccessToken, claims); err != 
nil {
-               return "", fmt.Errorf("unable to decode the access token: %v", 
err)
+               return "", fmt.Errorf("unable to decode the access token: %w", 
err)
        }
        username, ok := claims[ClaimNameUserName]
        if !ok {
diff --git a/oauth2/cache/cache.go b/oauth2/cache/cache.go
index b7f89b21..e2279843 100644
--- a/oauth2/cache/cache.go
+++ b/oauth2/cache/cache.go
@@ -80,7 +80,7 @@ func (t *tokenCache) Token() (*xoauth2.Token, error) {
        // load from the store and use the access token if it isn't expired
        grant, err := t.store.LoadGrant(t.audience)
        if err != nil {
-               return nil, fmt.Errorf("LoadGrant: %v", err)
+               return nil, fmt.Errorf("LoadGrant: %w", err)
        }
        t.token = grant.Token
        if t.token != nil && t.validateAccessToken(*t.token) {
@@ -90,13 +90,13 @@ func (t *tokenCache) Token() (*xoauth2.Token, error) {
        // obtain and cache a fresh access token
        grant, err = t.refresher.Refresh(grant)
        if err != nil {
-               return nil, fmt.Errorf("RefreshGrant: %v", err)
+               return nil, fmt.Errorf("RefreshGrant: %w", err)
        }
        t.token = grant.Token
        err = t.store.SaveGrant(t.audience, *grant)
        if err != nil {
                // TODO log rather than throw
-               return nil, fmt.Errorf("SaveGrant: %v", err)
+               return nil, fmt.Errorf("SaveGrant: %w", err)
        }
 
        return t.token, nil
@@ -117,14 +117,14 @@ func (t *tokenCache) InvalidateToken() error {
        }
        grant, err := t.store.LoadGrant(t.audience)
        if err != nil {
-               return fmt.Errorf("LoadGrant: %v", err)
+               return fmt.Errorf("LoadGrant: %w", err)
        }
        if grant.Token != nil && grant.Token.AccessToken == 
previous.AccessToken {
                grant.Token.Expiry = time.Unix(0, 0).Add(expiryDelta)
                err = t.store.SaveGrant(t.audience, *grant)
                if err != nil {
                        // TODO log rather than throw
-                       return fmt.Errorf("SaveGrant: %v", err)
+                       return fmt.Errorf("SaveGrant: %w", err)
                }
        }
        return nil
diff --git a/oauth2/store/keyring.go b/oauth2/store/keyring.go
index 8a024f59..70fba5b0 100644
--- a/oauth2/store/keyring.go
+++ b/oauth2/store/keyring.go
@@ -20,6 +20,7 @@ package store
 import (
        "crypto/sha1"
        "encoding/json"
+       "errors"
        "fmt"
        "sync"
 
@@ -92,7 +93,7 @@ func (f *KeyringStore) LoadGrant(audience string) 
(*oauth2.AuthorizationGrant, e
 
        item, err := f.getItem(audience)
        if err != nil {
-               if err == keyring.ErrKeyNotFound {
+               if errors.Is(err, keyring.ErrKeyNotFound) {
                        return nil, ErrNoAuthenticationData
                }
                return nil, err
@@ -119,10 +120,10 @@ func (f *KeyringStore) WhoAmI(audience string) (string, 
error) {
        key := hashKeyringKey(audience)
        authItem, err := f.kr.Get(key)
        if err != nil {
-               if err == keyring.ErrKeyNotFound {
+               if errors.Is(err, keyring.ErrKeyNotFound) {
                        return "", ErrNoAuthenticationData
                }
-               return "", fmt.Errorf("unable to get information from the 
keyring: %v", err)
+               return "", fmt.Errorf("unable to get information from the 
keyring: %w", err)
        }
        return authItem.Label, nil
 }
@@ -134,13 +135,13 @@ func (f *KeyringStore) Logout() error {
        var err error
        keys, err := f.kr.Keys()
        if err != nil {
-               return fmt.Errorf("unable to get information from the keyring: 
%v", err)
+               return fmt.Errorf("unable to get information from the keyring: 
%w", err)
        }
        for _, key := range keys {
                err = f.kr.Remove(key)
        }
        if err != nil {
-               return fmt.Errorf("unable to update the keyring: %v", err)
+               return fmt.Errorf("unable to update the keyring: %w", err)
        }
        return nil
 }
@@ -180,7 +181,7 @@ func (f *KeyringStore) setItem(item storedItem) error {
        }
        err = f.kr.Set(i)
        if err != nil {
-               return fmt.Errorf("unable to update the keyring: %v", err)
+               return fmt.Errorf("unable to update the keyring: %w", err)
        }
        return nil
 }
diff --git a/oauth2/store/store.go b/oauth2/store/store.go
index 55d4c9ec..5e916920 100644
--- a/oauth2/store/store.go
+++ b/oauth2/store/store.go
@@ -26,7 +26,7 @@ import (
 // ErrNoAuthenticationData indicates that stored authentication data is not 
available
 var ErrNoAuthenticationData = errors.New("authentication data is not 
available")
 
-// ErrUnsupportedAuthData ndicates that stored authentication data is unusable
+// ErrUnsupportedAuthData indicates that stored authentication data is unusable
 var ErrUnsupportedAuthData = errors.New("authentication data is not usable")
 
 // Store is responsible for persisting authorization grants
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 98848d71..520d9e8d 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1158,21 +1158,21 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
        // error decrypting the payload
        if err != nil {
                // default crypto failure action
-               crypToFailureAction := crypto.ConsumerCryptoFailureActionFail
+               cryptoFailureAction := crypto.ConsumerCryptoFailureActionFail
                if pc.options.decryption != nil {
-                       crypToFailureAction = 
pc.options.decryption.ConsumerCryptoFailureAction
+                       cryptoFailureAction = 
pc.options.decryption.ConsumerCryptoFailureAction
                }
 
-               switch crypToFailureAction {
+               switch cryptoFailureAction {
                case crypto.ConsumerCryptoFailureActionFail:
-                       pc.log.Errorf("consuming message failed due to 
decryption err :%v", err)
+                       pc.log.Errorf("consuming message failed due to 
decryption err: %v", err)
                        
pc.NackID(newTrackingMessageID(int64(pbMsgID.GetLedgerId()), 
int64(pbMsgID.GetEntryId()), 0, 0, 0, nil))
                        return err
                case crypto.ConsumerCryptoFailureActionDiscard:
                        pc.discardCorruptedMessage(pbMsgID, 
pb.CommandAck_DecryptionError)
-                       return fmt.Errorf("discarding message on decryption 
error :%v", err)
+                       return fmt.Errorf("discarding message on decryption 
error: %w", err)
                case crypto.ConsumerCryptoFailureActionConsume:
-                       pc.log.Warnf("consuming encrypted message due to error 
in decryption :%v", err)
+                       pc.log.Warnf("consuming encrypted message due to error 
in decryption: %v", err)
                        messages := []*message{
                                {
                                        publishTime:  
timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
@@ -1767,16 +1767,16 @@ func (pc *partitionConsumer) runEventsLoop() {
 func (pc *partitionConsumer) internalClose(req *closeRequest) {
        defer close(req.doneCh)
        state := pc.getConsumerState()
-       if state != consumerReady {
-               // this might be redundant but to ensure nack tracker is closed
+       if state == consumerClosed || state == consumerClosing {
+               pc.log.WithField("state", state).Error("Consumer is closing or 
has closed")
                if pc.nackTracker != nil {
                        pc.nackTracker.Close()
                }
                return
        }
 
-       if state == consumerClosed || state == consumerClosing {
-               pc.log.WithField("state", state).Error("Consumer is closing or 
has closed")
+       if state != consumerReady {
+               // this might be redundant but to ensure nack tracker is closed
                if pc.nackTracker != nil {
                        pc.nackTracker.Close()
                }
diff --git a/pulsar/crypto/default_message_crypto.go 
b/pulsar/crypto/default_message_crypto.go
index 2239fa39..c0a38f53 100644
--- a/pulsar/crypto/default_message_crypto.go
+++ b/pulsar/crypto/default_message_crypto.go
@@ -95,7 +95,7 @@ func (d *DefaultMessageCrypto) addPublicKeyCipher(keyName 
string, keyReader KeyR
        d.cipherLock.Lock()
        defer d.cipherLock.Unlock()
        if keyName == "" || keyReader == nil {
-               return fmt.Errorf("keyname or keyreader is null")
+               return fmt.Errorf("keyname or keyreader is nil")
        }
 
        // read the public key and its info using keyReader
@@ -212,7 +212,7 @@ func (d *DefaultMessageCrypto) Encrypt(encKeys []string,
 func (d *DefaultMessageCrypto) Decrypt(msgMetadata MessageMetadataSupplier,
        payload []byte,
        keyReader KeyReader) ([]byte, error) {
-       // if data key is present, attempt to derypt using the existing key
+       // if data key is present, attempt to decrypt using the existing key
        if d.dataKey != nil {
                decryptedData, err := d.getKeyAndDecryptData(msgMetadata, 
payload)
                if err != nil {
@@ -342,20 +342,20 @@ func (d *DefaultMessageCrypto) loadPrivateKey(key []byte) 
(gocrypto.PrivateKey,
 
 // read the public key into RSA key
 func (d *DefaultMessageCrypto) loadPublicKey(key []byte) (gocrypto.PublicKey, 
error) {
-       var publickKey gocrypto.PublicKey
+       var publicKey gocrypto.PublicKey
 
        pubPem, _ := pem.Decode(key)
        if pubPem == nil {
-               return publickKey, fmt.Errorf("failed to decode public key")
+               return publicKey, fmt.Errorf("failed to decode public key")
        }
 
        genericPublicKey, err := x509.ParsePKIXPublicKey(pubPem.Bytes)
        if err != nil {
-               return publickKey, err
+               return publicKey, err
        }
-       publickKey = genericPublicKey
+       publicKey = genericPublicKey
 
-       return publickKey, nil
+       return publicKey, nil
 }
 
 func generateDataKey() ([]byte, error) {
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 7471ee0d..b9f46234 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -272,7 +272,7 @@ func serializeMessage(wb Buffer,
        encryptedPayload, err := encryptor.Encrypt(compressedPayload, 
msgMetadata)
        if err != nil {
                // error occurred while encrypting the payload, 
ProducerCryptoFailureAction is set to Fail
-               return fmt.Errorf("encryption of message failed, 
ProducerCryptoFailureAction is set to Fail. Error :%v", err)
+               return fmt.Errorf("encryption of message failed, 
ProducerCryptoFailureAction is set to Fail. Error: %w", err)
        }
 
        cmdSize := uint32(proto.Size(cmdSend))
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 84c4323d..1faccc91 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -947,7 +947,7 @@ func (c *connection) 
handleTopicMigrated(commandTopicMigrated *pb.CommandTopicMi
        resourceID := commandTopicMigrated.GetResourceId()
        migratedBrokerServiceURL := 
c.getMigratedBrokerServiceURL(commandTopicMigrated)
        if migratedBrokerServiceURL == "" {
-               c.log.Warnf("Failed to find the migrated broker url for 
resource: %s, migratedBrokerUrl: %s, migratedBrokerUrlTls:%s",
+               c.log.Warnf("Failed to find the migrated broker url for 
resource: %d, migratedBrokerUrl: %s, migratedBrokerUrlTls:%s",
                        resourceID,
                        commandTopicMigrated.GetBrokerServiceUrl(),
                        commandTopicMigrated.GetBrokerServiceUrlTls())
diff --git a/pulsar/internal/crypto/producer_encryptor.go 
b/pulsar/internal/crypto/producer_encryptor.go
index a5b972da..01dc5f83 100644
--- a/pulsar/internal/crypto/producer_encryptor.go
+++ b/pulsar/internal/crypto/producer_encryptor.go
@@ -55,11 +55,11 @@ func (e *producerEncryptor) Encrypt(payload []byte, 
msgMetadata *pb.MessageMetad
                crypto.NewMessageMetadataSupplier(msgMetadata),
                payload)
 
-       // error encryping the payload
+       // error encrypting the payload
        if err != nil {
                // error occurred in encrypting the payload
                // crypto ProducerCryptoFailureAction is set to send
-               // send unencrypted message
+               // unencrypted message
                if e.producerCryptoFailureAction == 
crypto.ProducerCryptoFailureActionSend {
                        e.logger.
                                WithError(err).
@@ -67,7 +67,7 @@ func (e *producerEncryptor) Encrypt(payload []byte, 
msgMetadata *pb.MessageMetad
                        return payload, nil
                }
 
-               return nil, fmt.Errorf("ProducerCryptoFailureAction is set to 
Fail and error occurred in encrypting payload :%v", err)
+               return nil, fmt.Errorf("ProducerCryptoFailureAction is set to 
Fail and error occurred in encrypting payload: %w", err)
        }
        return encryptedPayload, nil
 }
diff --git a/pulsar/primitiveSerDe.go b/pulsar/primitiveSerDe.go
index 0d53aa1c..da3d8491 100644
--- a/pulsar/primitiveSerDe.go
+++ b/pulsar/primitiveSerDe.go
@@ -100,14 +100,14 @@ func (b BinaryFreeList) Uint64(r io.Reader, byteOrder 
binary.ByteOrder) (uint64,
 
 func (b BinaryFreeList) Float64(buf []byte) (float64, error) {
        if len(buf) < 8 {
-               return 0, fmt.Errorf("cannot decode binary double: %s", 
io.ErrShortBuffer)
+               return 0, fmt.Errorf("cannot decode binary double: %w", 
io.ErrShortBuffer)
        }
        return math.Float64frombits(binary.BigEndian.Uint64(buf[:8])), nil
 }
 
 func (b BinaryFreeList) Float32(buf []byte) (float32, error) {
        if len(buf) < 4 {
-               return 0, fmt.Errorf("cannot decode binary float: %s", 
io.ErrShortBuffer)
+               return 0, fmt.Errorf("cannot decode binary float: %w", 
io.ErrShortBuffer)
        }
        return math.Float32frombits(binary.BigEndian.Uint32(buf[:4])), nil
 }
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index ca923108..8e970d28 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -143,7 +143,7 @@ func newProducer(client *client, options *ProducerOptions) 
(*producer, error) {
                                true,
                                client.log.SubLogger(log.Fields{"topic": 
p.topic}))
                        if err != nil {
-                               return nil, fmt.Errorf("unable to get 
MessageCrypto instance. Producer creation is abandoned. %v", err)
+                               return nil, fmt.Errorf("unable to get 
MessageCrypto instance. Producer creation is abandoned. %w", err)
                        }
                        p.options.Encryption.MessageCrypto = messageCrypto
                }
diff --git a/pulsar/table_view_impl.go b/pulsar/table_view_impl.go
index 60b66e33..5f7f4e2b 100644
--- a/pulsar/table_view_impl.go
+++ b/pulsar/table_view_impl.go
@@ -41,8 +41,8 @@ type TableViewImpl struct {
        dataMu sync.Mutex
        data   map[string]interface{}
 
-       readersMu    sync.Mutex
-       cancelRaders map[string]cancelReader
+       readersMu     sync.Mutex
+       cancelReaders map[string]cancelReader
 
        listenersMu sync.Mutex
        listeners   []func(string, interface{}) error
@@ -73,12 +73,12 @@ func newTableView(client *client, options TableViewOptions) 
(TableView, error) {
        }
 
        tv := TableViewImpl{
-               client:       client,
-               options:      options,
-               data:         make(map[string]interface{}),
-               cancelRaders: make(map[string]cancelReader),
-               logger:       logger,
-               closedCh:     make(chan struct{}),
+               client:        client,
+               options:       options,
+               data:          make(map[string]interface{}),
+               cancelReaders: make(map[string]cancelReader),
+               logger:        logger,
+               closedCh:      make(chan struct{}),
        }
 
        // Do an initial round of partition update check to make sure we can 
populate the partition readers
@@ -104,16 +104,16 @@ func (tv *TableViewImpl) partitionUpdateCheck() error {
        tv.readersMu.Lock()
        defer tv.readersMu.Unlock()
 
-       for partition, cancelReader := range tv.cancelRaders {
+       for partition, cancelReader := range tv.cancelReaders {
                if _, ok := partitions[partition]; !ok {
                        cancelReader.cancelFunc()
                        cancelReader.reader.Close()
-                       delete(tv.cancelRaders, partition)
+                       delete(tv.cancelReaders, partition)
                }
        }
 
        for partition := range partitions {
-               if _, ok := tv.cancelRaders[partition]; !ok {
+               if _, ok := tv.cancelReaders[partition]; !ok {
                        reader, err := newReader(tv.client, ReaderOptions{
                                Topic:          partition,
                                StartMessageID: EarliestMessageID(),
@@ -127,14 +127,14 @@ func (tv *TableViewImpl) partitionUpdateCheck() error {
                        for reader.HasNext() {
                                msg, err := reader.Next(context.Background())
                                if err != nil {
-                                       tv.logger.Errorf("read next message 
failed for %s: %w", partition, err)
+                                       tv.logger.Errorf("read next message 
failed for %s: %v", partition, err)
                                }
                                if msg != nil {
                                        tv.handleMessage(msg)
                                }
                        }
                        ctx, cancelFunc := 
context.WithCancel(context.Background())
-                       tv.cancelRaders[partition] = cancelReader{
+                       tv.cancelReaders[partition] = cancelReader{
                                reader:     reader,
                                cancelFunc: cancelFunc,
                        }
@@ -148,7 +148,7 @@ func (tv *TableViewImpl) partitionUpdateCheck() error {
 func (tv *TableViewImpl) periodicPartitionUpdateCheck() {
        for {
                if err := tv.partitionUpdateCheck(); err != nil {
-                       tv.logger.Errorf("failed to check for changes in number 
of partitions: %w", err)
+                       tv.logger.Errorf("failed to check for changes in number 
of partitions: %v", err)
                }
                select {
                case <-tv.closedCh:
@@ -236,7 +236,7 @@ func (tv *TableViewImpl) Close() {
 
        if !tv.closed {
                tv.closed = true
-               for _, cancelReader := range tv.cancelRaders {
+               for _, cancelReader := range tv.cancelReaders {
                        cancelReader.reader.Close()
                }
                close(tv.closedCh)
@@ -259,7 +259,7 @@ func (tv *TableViewImpl) handleMessage(msg Message) {
 
        for _, listener := range tv.listeners {
                if err := listener(msg.Key(), 
reflect.Indirect(payload).Interface()); err != nil {
-                       tv.logger.Errorf("table view listener failed for %v: 
%w", msg, err)
+                       tv.logger.Errorf("table view listener failed for %v: 
%v", msg, err)
                }
        }
 }
@@ -268,7 +268,7 @@ func (tv *TableViewImpl) watchReaderForNewMessages(ctx 
context.Context, reader R
        for {
                msg, err := reader.Next(ctx)
                if err != nil {
-                       tv.logger.Errorf("read next message failed for %s: %w", 
reader.Topic(), err)
+                       tv.logger.Errorf("read next message failed for %s: %v", 
reader.Topic(), err)
                }
                var e *Error
                if (errors.As(err, &e) && e.Result() == ConsumerClosed) || 
errors.Is(err, context.Canceled) {

Reply via email to