This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 468bfd6 Encryption failure test case fix (#708)
468bfd6 is described below
commit 468bfd6bdcd45857e64fd499b98c7f30ec6f60f7
Author: Garule Prabhudas <[email protected]>
AuthorDate: Mon Jan 17 12:02:51 2022 +0530
Encryption failure test case fix (#708)
* test case to detect race condition in creation of producer/consumer
* fix for race condition in consumer/producer creation
* refactor
* restore test case
Co-authored-by: PGarule <[email protected]>
---
pulsar/consumer_impl.go | 30 +++++++++++++++++++
pulsar/consumer_partition.go | 7 -----
pulsar/consumer_test.go | 70 +++++++++++++++++++++++++++++++++++++++++++-
pulsar/producer_impl.go | 21 +++++++++++++
pulsar/producer_partition.go | 21 -------------
pulsar/reader_impl.go | 12 ++++++++
6 files changed, 132 insertions(+), 29 deletions(-)
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 00e0b76..2bd3ed5 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -25,6 +25,7 @@ import (
"sync"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
@@ -156,6 +157,10 @@ func newConsumer(client *client, options ConsumerOptions)
(Consumer, error) {
return nil, err
}
topic = tns[0].Name
+ err = addMessageCryptoIfMissing(client, &options, topic)
+ if err != nil {
+ return nil, err
+ }
return newInternalConsumer(client, options, topic, messageCh,
dlq, rlq, false)
}
@@ -168,6 +173,11 @@ func newConsumer(client *client, options ConsumerOptions)
(Consumer, error) {
}
options.Topics = distinct(options.Topics)
+ err = addMessageCryptoIfMissing(client, &options,
options.Topics)
+ if err != nil {
+ return nil, err
+ }
+
return newMultiTopicConsumer(client, options, options.Topics,
messageCh, dlq, rlq)
}
@@ -181,6 +191,12 @@ func newConsumer(client *client, options ConsumerOptions)
(Consumer, error) {
if err != nil {
return nil, err
}
+
+ err = addMessageCryptoIfMissing(client, &options, tn.Name)
+ if err != nil {
+ return nil, err
+ }
+
return newRegexConsumer(client, options, tn, pattern,
messageCh, dlq, rlq)
}
@@ -654,3 +670,17 @@ func (c *consumer) messageID(msgID MessageID)
(trackingMessageID, bool) {
return mid, true
}
+
+func addMessageCryptoIfMissing(client *client, options *ConsumerOptions,
topics interface{}) error {
+ // decryption is enabled, use default messagecrypto if not provided
+ if options.Decryption != nil && options.Decryption.MessageCrypto == nil
{
+ messageCrypto, err := crypto.NewDefaultMessageCrypto("decrypt",
+ false,
+ client.log.SubLogger(log.Fields{"topic": topics}))
+ if err != nil {
+ return err
+ }
+ options.Decryption.MessageCrypto = messageCrypto
+ }
+ return nil
+}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 1d95c42..7679a8c 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -186,13 +186,6 @@ func newPartitionConsumer(parent Consumer, client *client,
options *partitionCon
if pc.options.decryption == nil {
decryptor = cryptointernal.NewNoopDecryptor() // default to
noopDecryptor
} else {
- if options.decryption.MessageCrypto == nil {
- messageCrypto, err :=
crypto.NewDefaultMessageCrypto("decrypt", false, pc.log)
- if err != nil {
- return nil, err
- }
- options.decryption.MessageCrypto = messageCrypto
- }
decryptor = cryptointernal.NewConsumerDecryptor(
options.decryption.KeyReader,
options.decryption.MessageCrypto,
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 55823e4..cadd8e4 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -332,14 +332,77 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
assert.Nil(t, err)
defer client.Close()
+ topic := "persistent://public/default/testGetPartitions5"
+ testURL := adminURL + "/" +
"admin/v2/persistent/public/default/testGetPartitions5/partitions"
+
+ makeHTTPCall(t, http.MethodPut, testURL, "64")
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ topics, err := client.TopicPartitions(topic)
+ assert.Nil(t, err)
+ assert.Equal(t, topic+"-partition-0", topics[0])
+ assert.Equal(t, topic+"-partition-1", topics[1])
+ assert.Equal(t, topic+"-partition-2", topics[2])
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-sub",
+ Type: Exclusive,
+ ReceiverQueueSize: 10,
+ })
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ ctx := context.Background()
+ for i := 0; i < 10; i++ {
+ _, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ })
+ assert.Nil(t, err)
+ }
+
+ msgs := make([]string, 0)
+
+ for i := 0; i < 10; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ msgs = append(msgs, string(msg.Payload()))
+
+ fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
+ msg.ID(), string(msg.Payload()))
+
+ consumer.Ack(msg)
+ }
+
+ assert.Equal(t, len(msgs), 10)
+}
+
+func TestPartitionTopicsConsumerPubSubEncryption(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
topic := "persistent://public/default/testGetPartitions"
testURL := adminURL + "/" +
"admin/v2/persistent/public/default/testGetPartitions/partitions"
- makeHTTPCall(t, http.MethodPut, testURL, "64")
+ makeHTTPCall(t, http.MethodPut, testURL, "6")
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
+ Encryption: &ProducerEncryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ Keys: []string{"client-rsa.pem"},
+ },
})
assert.Nil(t, err)
defer producer.Close()
@@ -355,6 +418,11 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
SubscriptionName: "my-sub",
Type: Exclusive,
ReceiverQueueSize: 10,
+ Decryption: &MessageDecryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ ConsumerCryptoFailureAction:
crypto.ConsumerCryptoFailureActionFail,
+ },
})
assert.Nil(t, err)
defer consumer.Close()
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 48e2aa4..9bbfccb 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -19,11 +19,13 @@ package pulsar
import (
"context"
+ "fmt"
"sync"
"sync/atomic"
"time"
"unsafe"
+ "github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/log"
)
@@ -124,6 +126,25 @@ func newProducer(client *client, options *ProducerOptions)
(*producer, error) {
}
}
+ encryption := options.Encryption
+ // add default message crypto if not provided
+ if encryption != nil && len(encryption.Keys) > 0 {
+ if encryption.KeyReader == nil {
+ return nil, fmt.Errorf("encryption is enabled,
KeyReader can not be nil")
+ }
+
+ if encryption.MessageCrypto == nil {
+ logCtx := fmt.Sprintf("[%v] [%v]", p.topic,
p.options.Name)
+ messageCrypto, err :=
crypto.NewDefaultMessageCrypto(logCtx,
+ true,
+ client.log.SubLogger(log.Fields{"topic":
p.topic}))
+ if err != nil {
+ return nil, fmt.Errorf("unable to get
MessageCrypto instance. Producer creation is abandoned. %v", err)
+ }
+ p.options.Encryption.MessageCrypto = messageCrypto
+ }
+ }
+
err := p.internalCreatePartitionsProducers()
if err != nil {
return nil, err
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 8fdcfdb..e318b81 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -19,13 +19,11 @@ package pulsar
import (
"context"
- "fmt"
"strings"
"sync"
"sync/atomic"
"time"
- "github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
internalcrypto
"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
@@ -142,25 +140,6 @@ func newPartitionProducer(client *client, topic string,
options *ProducerOptions
} else {
p.userProvidedProducerName = false
}
-
- encryption := options.Encryption
- // add default message crypto if not provided
- if encryption != nil && len(encryption.Keys) > 0 {
- if encryption.KeyReader == nil {
- return nil, fmt.Errorf("encryption is enabled,
KeyReader can not be nil")
- }
-
- if encryption.MessageCrypto == nil {
- logCtx := fmt.Sprintf("[%v] [%v] [%v]", p.topic,
p.producerName, p.producerID)
- messageCrypto, err :=
crypto.NewDefaultMessageCrypto(logCtx, true, logger)
- if err != nil {
- logger.WithError(err).Error("Unable to get
MessageCrypto instance. Producer creation is abandoned")
- return nil, err
- }
- p.options.Encryption.MessageCrypto = messageCrypto
- }
- }
-
err := p.grabCnx()
if err != nil {
logger.WithError(err).Error("Failed to create producer")
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index c854325..0fed80c 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -23,6 +23,7 @@ import (
"sync"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/log"
)
@@ -76,6 +77,17 @@ func newReader(client *client, options ReaderOptions)
(Reader, error) {
receiverQueueSize = defaultReceiverQueueSize
}
+ // decryption is enabled, use default message crypto if not provided
+ if options.Decryption != nil && options.Decryption.MessageCrypto == nil
{
+ messageCrypto, err := crypto.NewDefaultMessageCrypto("decrypt",
+ false,
+ client.log.SubLogger(log.Fields{"topic":
options.Topic}))
+ if err != nil {
+ return nil, err
+ }
+ options.Decryption.MessageCrypto = messageCrypto
+ }
+
consumerOptions := &partitionConsumerOpts{
topic: options.Topic,
consumerName: options.Name,