This is an automated email from the ASF dual-hosted git repository.
crossoverjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 024e230d fix: wrap errors using %w to preserve context (#1321)
024e230d is described below
commit 024e230d32fc6a0d5586b8c8bc6b34a208b9e9d0
Author: Eugene R. <[email protected]>
AuthorDate: Thu Dec 19 04:25:08 2024 +0200
fix: wrap errors using %w to preserve context (#1321)
* fix: wrap errors using %w to preserve context
* move the consumer state check
---
oauth2/auth.go | 2 +-
oauth2/cache/cache.go | 10 ++++----
oauth2/store/keyring.go | 13 ++++++-----
oauth2/store/store.go | 2 +-
pulsar/consumer_partition.go | 20 ++++++++--------
pulsar/crypto/default_message_crypto.go | 14 ++++++------
pulsar/internal/commands.go | 2 +-
pulsar/internal/connection.go | 2 +-
pulsar/internal/crypto/producer_encryptor.go | 6 ++---
pulsar/primitiveSerDe.go | 4 ++--
pulsar/producer_impl.go | 2 +-
pulsar/table_view_impl.go | 34 ++++++++++++++--------------
12 files changed, 56 insertions(+), 55 deletions(-)
diff --git a/oauth2/auth.go b/oauth2/auth.go
index 9f4293b5..a4e72d2f 100644
--- a/oauth2/auth.go
+++ b/oauth2/auth.go
@@ -112,7 +112,7 @@ func ExtractUserName(token oauth2.Token) (string, error) {
p := jwt.Parser{}
claims := jwt.MapClaims{}
if _, _, err := p.ParseUnverified(token.AccessToken, claims); err !=
nil {
- return "", fmt.Errorf("unable to decode the access token: %v",
err)
+ return "", fmt.Errorf("unable to decode the access token: %w",
err)
}
username, ok := claims[ClaimNameUserName]
if !ok {
diff --git a/oauth2/cache/cache.go b/oauth2/cache/cache.go
index b7f89b21..e2279843 100644
--- a/oauth2/cache/cache.go
+++ b/oauth2/cache/cache.go
@@ -80,7 +80,7 @@ func (t *tokenCache) Token() (*xoauth2.Token, error) {
// load from the store and use the access token if it isn't expired
grant, err := t.store.LoadGrant(t.audience)
if err != nil {
- return nil, fmt.Errorf("LoadGrant: %v", err)
+ return nil, fmt.Errorf("LoadGrant: %w", err)
}
t.token = grant.Token
if t.token != nil && t.validateAccessToken(*t.token) {
@@ -90,13 +90,13 @@ func (t *tokenCache) Token() (*xoauth2.Token, error) {
// obtain and cache a fresh access token
grant, err = t.refresher.Refresh(grant)
if err != nil {
- return nil, fmt.Errorf("RefreshGrant: %v", err)
+ return nil, fmt.Errorf("RefreshGrant: %w", err)
}
t.token = grant.Token
err = t.store.SaveGrant(t.audience, *grant)
if err != nil {
// TODO log rather than throw
- return nil, fmt.Errorf("SaveGrant: %v", err)
+ return nil, fmt.Errorf("SaveGrant: %w", err)
}
return t.token, nil
@@ -117,14 +117,14 @@ func (t *tokenCache) InvalidateToken() error {
}
grant, err := t.store.LoadGrant(t.audience)
if err != nil {
- return fmt.Errorf("LoadGrant: %v", err)
+ return fmt.Errorf("LoadGrant: %w", err)
}
if grant.Token != nil && grant.Token.AccessToken ==
previous.AccessToken {
grant.Token.Expiry = time.Unix(0, 0).Add(expiryDelta)
err = t.store.SaveGrant(t.audience, *grant)
if err != nil {
// TODO log rather than throw
- return fmt.Errorf("SaveGrant: %v", err)
+ return fmt.Errorf("SaveGrant: %w", err)
}
}
return nil
diff --git a/oauth2/store/keyring.go b/oauth2/store/keyring.go
index 8a024f59..70fba5b0 100644
--- a/oauth2/store/keyring.go
+++ b/oauth2/store/keyring.go
@@ -20,6 +20,7 @@ package store
import (
"crypto/sha1"
"encoding/json"
+ "errors"
"fmt"
"sync"
@@ -92,7 +93,7 @@ func (f *KeyringStore) LoadGrant(audience string)
(*oauth2.AuthorizationGrant, e
item, err := f.getItem(audience)
if err != nil {
- if err == keyring.ErrKeyNotFound {
+ if errors.Is(err, keyring.ErrKeyNotFound) {
return nil, ErrNoAuthenticationData
}
return nil, err
@@ -119,10 +120,10 @@ func (f *KeyringStore) WhoAmI(audience string) (string,
error) {
key := hashKeyringKey(audience)
authItem, err := f.kr.Get(key)
if err != nil {
- if err == keyring.ErrKeyNotFound {
+ if errors.Is(err, keyring.ErrKeyNotFound) {
return "", ErrNoAuthenticationData
}
- return "", fmt.Errorf("unable to get information from the
keyring: %v", err)
+ return "", fmt.Errorf("unable to get information from the
keyring: %w", err)
}
return authItem.Label, nil
}
@@ -134,13 +135,13 @@ func (f *KeyringStore) Logout() error {
var err error
keys, err := f.kr.Keys()
if err != nil {
- return fmt.Errorf("unable to get information from the keyring:
%v", err)
+ return fmt.Errorf("unable to get information from the keyring:
%w", err)
}
for _, key := range keys {
err = f.kr.Remove(key)
}
if err != nil {
- return fmt.Errorf("unable to update the keyring: %v", err)
+ return fmt.Errorf("unable to update the keyring: %w", err)
}
return nil
}
@@ -180,7 +181,7 @@ func (f *KeyringStore) setItem(item storedItem) error {
}
err = f.kr.Set(i)
if err != nil {
- return fmt.Errorf("unable to update the keyring: %v", err)
+ return fmt.Errorf("unable to update the keyring: %w", err)
}
return nil
}
diff --git a/oauth2/store/store.go b/oauth2/store/store.go
index 55d4c9ec..5e916920 100644
--- a/oauth2/store/store.go
+++ b/oauth2/store/store.go
@@ -26,7 +26,7 @@ import (
// ErrNoAuthenticationData indicates that stored authentication data is not
available
var ErrNoAuthenticationData = errors.New("authentication data is not
available")
-// ErrUnsupportedAuthData ndicates that stored authentication data is unusable
+// ErrUnsupportedAuthData indicates that stored authentication data is unusable
var ErrUnsupportedAuthData = errors.New("authentication data is not usable")
// Store is responsible for persisting authorization grants
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 98848d71..520d9e8d 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -1158,21 +1158,21 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
// error decrypting the payload
if err != nil {
// default crypto failure action
- crypToFailureAction := crypto.ConsumerCryptoFailureActionFail
+ cryptoFailureAction := crypto.ConsumerCryptoFailureActionFail
if pc.options.decryption != nil {
- crypToFailureAction =
pc.options.decryption.ConsumerCryptoFailureAction
+ cryptoFailureAction =
pc.options.decryption.ConsumerCryptoFailureAction
}
- switch crypToFailureAction {
+ switch cryptoFailureAction {
case crypto.ConsumerCryptoFailureActionFail:
- pc.log.Errorf("consuming message failed due to
decryption err :%v", err)
+ pc.log.Errorf("consuming message failed due to
decryption err: %v", err)
pc.NackID(newTrackingMessageID(int64(pbMsgID.GetLedgerId()),
int64(pbMsgID.GetEntryId()), 0, 0, 0, nil))
return err
case crypto.ConsumerCryptoFailureActionDiscard:
pc.discardCorruptedMessage(pbMsgID,
pb.CommandAck_DecryptionError)
- return fmt.Errorf("discarding message on decryption
error :%v", err)
+ return fmt.Errorf("discarding message on decryption
error: %w", err)
case crypto.ConsumerCryptoFailureActionConsume:
- pc.log.Warnf("consuming encrypted message due to error
in decryption :%v", err)
+ pc.log.Warnf("consuming encrypted message due to error
in decryption: %v", err)
messages := []*message{
{
publishTime:
timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
@@ -1767,16 +1767,16 @@ func (pc *partitionConsumer) runEventsLoop() {
func (pc *partitionConsumer) internalClose(req *closeRequest) {
defer close(req.doneCh)
state := pc.getConsumerState()
- if state != consumerReady {
- // this might be redundant but to ensure nack tracker is closed
+ if state == consumerClosed || state == consumerClosing {
+ pc.log.WithField("state", state).Error("Consumer is closing or
has closed")
if pc.nackTracker != nil {
pc.nackTracker.Close()
}
return
}
- if state == consumerClosed || state == consumerClosing {
- pc.log.WithField("state", state).Error("Consumer is closing or
has closed")
+ if state != consumerReady {
+ // this might be redundant but to ensure nack tracker is closed
if pc.nackTracker != nil {
pc.nackTracker.Close()
}
diff --git a/pulsar/crypto/default_message_crypto.go
b/pulsar/crypto/default_message_crypto.go
index 2239fa39..c0a38f53 100644
--- a/pulsar/crypto/default_message_crypto.go
+++ b/pulsar/crypto/default_message_crypto.go
@@ -95,7 +95,7 @@ func (d *DefaultMessageCrypto) addPublicKeyCipher(keyName
string, keyReader KeyR
d.cipherLock.Lock()
defer d.cipherLock.Unlock()
if keyName == "" || keyReader == nil {
- return fmt.Errorf("keyname or keyreader is null")
+ return fmt.Errorf("keyname or keyreader is nil")
}
// read the public key and its info using keyReader
@@ -212,7 +212,7 @@ func (d *DefaultMessageCrypto) Encrypt(encKeys []string,
func (d *DefaultMessageCrypto) Decrypt(msgMetadata MessageMetadataSupplier,
payload []byte,
keyReader KeyReader) ([]byte, error) {
- // if data key is present, attempt to derypt using the existing key
+ // if data key is present, attempt to decrypt using the existing key
if d.dataKey != nil {
decryptedData, err := d.getKeyAndDecryptData(msgMetadata,
payload)
if err != nil {
@@ -342,20 +342,20 @@ func (d *DefaultMessageCrypto) loadPrivateKey(key []byte)
(gocrypto.PrivateKey,
// read the public key into RSA key
func (d *DefaultMessageCrypto) loadPublicKey(key []byte) (gocrypto.PublicKey,
error) {
- var publickKey gocrypto.PublicKey
+ var publicKey gocrypto.PublicKey
pubPem, _ := pem.Decode(key)
if pubPem == nil {
- return publickKey, fmt.Errorf("failed to decode public key")
+ return publicKey, fmt.Errorf("failed to decode public key")
}
genericPublicKey, err := x509.ParsePKIXPublicKey(pubPem.Bytes)
if err != nil {
- return publickKey, err
+ return publicKey, err
}
- publickKey = genericPublicKey
+ publicKey = genericPublicKey
- return publickKey, nil
+ return publicKey, nil
}
func generateDataKey() ([]byte, error) {
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 7471ee0d..b9f46234 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -272,7 +272,7 @@ func serializeMessage(wb Buffer,
encryptedPayload, err := encryptor.Encrypt(compressedPayload,
msgMetadata)
if err != nil {
// error occurred while encrypting the payload,
ProducerCryptoFailureAction is set to Fail
- return fmt.Errorf("encryption of message failed,
ProducerCryptoFailureAction is set to Fail. Error :%v", err)
+ return fmt.Errorf("encryption of message failed,
ProducerCryptoFailureAction is set to Fail. Error: %w", err)
}
cmdSize := uint32(proto.Size(cmdSend))
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 84c4323d..1faccc91 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -947,7 +947,7 @@ func (c *connection)
handleTopicMigrated(commandTopicMigrated *pb.CommandTopicMi
resourceID := commandTopicMigrated.GetResourceId()
migratedBrokerServiceURL :=
c.getMigratedBrokerServiceURL(commandTopicMigrated)
if migratedBrokerServiceURL == "" {
- c.log.Warnf("Failed to find the migrated broker url for
resource: %s, migratedBrokerUrl: %s, migratedBrokerUrlTls:%s",
+ c.log.Warnf("Failed to find the migrated broker url for
resource: %d, migratedBrokerUrl: %s, migratedBrokerUrlTls:%s",
resourceID,
commandTopicMigrated.GetBrokerServiceUrl(),
commandTopicMigrated.GetBrokerServiceUrlTls())
diff --git a/pulsar/internal/crypto/producer_encryptor.go
b/pulsar/internal/crypto/producer_encryptor.go
index a5b972da..01dc5f83 100644
--- a/pulsar/internal/crypto/producer_encryptor.go
+++ b/pulsar/internal/crypto/producer_encryptor.go
@@ -55,11 +55,11 @@ func (e *producerEncryptor) Encrypt(payload []byte,
msgMetadata *pb.MessageMetad
crypto.NewMessageMetadataSupplier(msgMetadata),
payload)
- // error encryping the payload
+ // error encrypting the payload
if err != nil {
// error occurred in encrypting the payload
// crypto ProducerCryptoFailureAction is set to send
- // send unencrypted message
+ // unencrypted message
if e.producerCryptoFailureAction ==
crypto.ProducerCryptoFailureActionSend {
e.logger.
WithError(err).
@@ -67,7 +67,7 @@ func (e *producerEncryptor) Encrypt(payload []byte,
msgMetadata *pb.MessageMetad
return payload, nil
}
- return nil, fmt.Errorf("ProducerCryptoFailureAction is set to
Fail and error occurred in encrypting payload :%v", err)
+ return nil, fmt.Errorf("ProducerCryptoFailureAction is set to
Fail and error occurred in encrypting payload: %w", err)
}
return encryptedPayload, nil
}
diff --git a/pulsar/primitiveSerDe.go b/pulsar/primitiveSerDe.go
index 0d53aa1c..da3d8491 100644
--- a/pulsar/primitiveSerDe.go
+++ b/pulsar/primitiveSerDe.go
@@ -100,14 +100,14 @@ func (b BinaryFreeList) Uint64(r io.Reader, byteOrder
binary.ByteOrder) (uint64,
func (b BinaryFreeList) Float64(buf []byte) (float64, error) {
if len(buf) < 8 {
- return 0, fmt.Errorf("cannot decode binary double: %s",
io.ErrShortBuffer)
+ return 0, fmt.Errorf("cannot decode binary double: %w",
io.ErrShortBuffer)
}
return math.Float64frombits(binary.BigEndian.Uint64(buf[:8])), nil
}
func (b BinaryFreeList) Float32(buf []byte) (float32, error) {
if len(buf) < 4 {
- return 0, fmt.Errorf("cannot decode binary float: %s",
io.ErrShortBuffer)
+ return 0, fmt.Errorf("cannot decode binary float: %w",
io.ErrShortBuffer)
}
return math.Float32frombits(binary.BigEndian.Uint32(buf[:4])), nil
}
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index ca923108..8e970d28 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -143,7 +143,7 @@ func newProducer(client *client, options *ProducerOptions)
(*producer, error) {
true,
client.log.SubLogger(log.Fields{"topic":
p.topic}))
if err != nil {
- return nil, fmt.Errorf("unable to get
MessageCrypto instance. Producer creation is abandoned. %v", err)
+ return nil, fmt.Errorf("unable to get
MessageCrypto instance. Producer creation is abandoned. %w", err)
}
p.options.Encryption.MessageCrypto = messageCrypto
}
diff --git a/pulsar/table_view_impl.go b/pulsar/table_view_impl.go
index 60b66e33..5f7f4e2b 100644
--- a/pulsar/table_view_impl.go
+++ b/pulsar/table_view_impl.go
@@ -41,8 +41,8 @@ type TableViewImpl struct {
dataMu sync.Mutex
data map[string]interface{}
- readersMu sync.Mutex
- cancelRaders map[string]cancelReader
+ readersMu sync.Mutex
+ cancelReaders map[string]cancelReader
listenersMu sync.Mutex
listeners []func(string, interface{}) error
@@ -73,12 +73,12 @@ func newTableView(client *client, options TableViewOptions)
(TableView, error) {
}
tv := TableViewImpl{
- client: client,
- options: options,
- data: make(map[string]interface{}),
- cancelRaders: make(map[string]cancelReader),
- logger: logger,
- closedCh: make(chan struct{}),
+ client: client,
+ options: options,
+ data: make(map[string]interface{}),
+ cancelReaders: make(map[string]cancelReader),
+ logger: logger,
+ closedCh: make(chan struct{}),
}
// Do an initial round of partition update check to make sure we can
populate the partition readers
@@ -104,16 +104,16 @@ func (tv *TableViewImpl) partitionUpdateCheck() error {
tv.readersMu.Lock()
defer tv.readersMu.Unlock()
- for partition, cancelReader := range tv.cancelRaders {
+ for partition, cancelReader := range tv.cancelReaders {
if _, ok := partitions[partition]; !ok {
cancelReader.cancelFunc()
cancelReader.reader.Close()
- delete(tv.cancelRaders, partition)
+ delete(tv.cancelReaders, partition)
}
}
for partition := range partitions {
- if _, ok := tv.cancelRaders[partition]; !ok {
+ if _, ok := tv.cancelReaders[partition]; !ok {
reader, err := newReader(tv.client, ReaderOptions{
Topic: partition,
StartMessageID: EarliestMessageID(),
@@ -127,14 +127,14 @@ func (tv *TableViewImpl) partitionUpdateCheck() error {
for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
- tv.logger.Errorf("read next message
failed for %s: %w", partition, err)
+ tv.logger.Errorf("read next message
failed for %s: %v", partition, err)
}
if msg != nil {
tv.handleMessage(msg)
}
}
ctx, cancelFunc :=
context.WithCancel(context.Background())
- tv.cancelRaders[partition] = cancelReader{
+ tv.cancelReaders[partition] = cancelReader{
reader: reader,
cancelFunc: cancelFunc,
}
@@ -148,7 +148,7 @@ func (tv *TableViewImpl) partitionUpdateCheck() error {
func (tv *TableViewImpl) periodicPartitionUpdateCheck() {
for {
if err := tv.partitionUpdateCheck(); err != nil {
- tv.logger.Errorf("failed to check for changes in number
of partitions: %w", err)
+ tv.logger.Errorf("failed to check for changes in number
of partitions: %v", err)
}
select {
case <-tv.closedCh:
@@ -236,7 +236,7 @@ func (tv *TableViewImpl) Close() {
if !tv.closed {
tv.closed = true
- for _, cancelReader := range tv.cancelRaders {
+ for _, cancelReader := range tv.cancelReaders {
cancelReader.reader.Close()
}
close(tv.closedCh)
@@ -259,7 +259,7 @@ func (tv *TableViewImpl) handleMessage(msg Message) {
for _, listener := range tv.listeners {
if err := listener(msg.Key(),
reflect.Indirect(payload).Interface()); err != nil {
- tv.logger.Errorf("table view listener failed for %v:
%w", msg, err)
+ tv.logger.Errorf("table view listener failed for %v:
%v", msg, err)
}
}
}
@@ -268,7 +268,7 @@ func (tv *TableViewImpl) watchReaderForNewMessages(ctx
context.Context, reader R
for {
msg, err := reader.Next(ctx)
if err != nil {
- tv.logger.Errorf("read next message failed for %s: %w",
reader.Topic(), err)
+ tv.logger.Errorf("read next message failed for %s: %v",
reader.Topic(), err)
}
var e *Error
if (errors.As(err, &e) && e.Result() == ConsumerClosed) ||
errors.Is(err, context.Canceled) {