This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new ab96ad7 Encryption support ext consumer (#612)
ab96ad7 is described below
commit ab96ad7d84b7c53e3e6d241f68c24eb7d6fa0037
Author: Garule Prabhudas <[email protected]>
AuthorDate: Sat Oct 9 14:14:53 2021 +0530
Encryption support ext consumer (#612)
* add ability to encrypt messages
- use base crypto package for encryption
* fix typo
* lint fixes
* address review suggestions
* revert go mod
* remove encryption context
- move it to Consumer MR
* try to fix check issues
* remove unused code
* consumer encryption/decryption changes
* remove embedded crypto struct
* remove embedded struct
* add comments
* merge conflict issues fix
* add noop decryptor
* lint fixes
* fix test case
* refactor and reader encryption changes
* refactor
- move decryptor creation to partition_producer.go
- update reader_impl
- update consumer_impl
* address review feedback
* Nack on decryption failure
* add comment on test case
Co-authored-by: PGarule <[email protected]>
---
pulsar/consumer.go | 3 +
pulsar/consumer_impl.go | 1 +
pulsar/consumer_partition.go | 108 ++-
pulsar/consumer_partition_test.go | 4 +
pulsar/consumer_test.go | 847 ++++++++++++++++++++-
pulsar/encryption.go | 12 +
pulsar/impl_message.go | 23 +
pulsar/internal/crypto/consumer_decryptor.go | 60 ++
.../crypto/decryptor.go} | 23 +-
.../crypto/noop_decryptor.go} | 30 +-
.../pulsartracing/message_carrier_util_test.go | 4 +
pulsar/message.go | 4 +
pulsar/reader.go | 3 +
pulsar/reader_impl.go | 1 +
pulsar/reader_test.go | 56 ++
15 files changed, 1148 insertions(+), 31 deletions(-)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 1c52b29..c9fbc0d 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -155,6 +155,9 @@ type ConsumerOptions struct {
// MaxReconnectToBroker set the maximum retry number of
reconnectToBroker. (default: ultimate)
MaxReconnectToBroker *uint
+
+ // Decryption decryption related fields to decrypt the encrypted message
+ Decryption *MessageDecryptionInfo
}
// Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 6677062..232079b 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -335,6 +335,7 @@ func (c *consumer) internalTopicSubscribeToPartitions()
error {
maxReconnectToBroker:
c.options.MaxReconnectToBroker,
keySharedPolicy:
c.options.KeySharedPolicy,
schema: c.options.Schema,
+ decryption:
c.options.Decryption,
}
cons, err := newPartitionConsumer(c, c.client, opts,
c.messageCh, c.dlq, c.metrics)
ch <- ConsumerError{
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 5f74bcf..e691d14 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -26,8 +26,10 @@ import (
"github.com/gogo/protobuf/proto"
+ "github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
+ cryptointernal
"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/apache/pulsar-client-go/pulsar/log"
@@ -98,6 +100,7 @@ type partitionConsumerOpts struct {
maxReconnectToBroker *uint
keySharedPolicy *KeySharedPolicy
schema Schema
+ decryption *MessageDecryptionInfo
}
type partitionConsumer struct {
@@ -142,6 +145,7 @@ type partitionConsumer struct {
providersMutex sync.RWMutex
compressionProviders map[pb.CompressionType]compression.Provider
metrics *internal.TopicMetrics
+ decryptor cryptointernal.Decryptor
}
func newPartitionConsumer(parent Consumer, client *client, options
*partitionConsumerOpts,
@@ -176,6 +180,27 @@ func newPartitionConsumer(parent Consumer, client *client,
options *partitionCon
"subscription": options.subscription,
"consumerID": pc.consumerID,
})
+
+ var decryptor cryptointernal.Decryptor
+ if pc.options.decryption == nil {
+ decryptor = cryptointernal.NewNoopDecryptor() // default to
noopDecryptor
+ } else {
+ if options.decryption.MessageCrypto == nil {
+ messageCrypto, err :=
crypto.NewDefaultMessageCrypto("decrypt", false, pc.log)
+ if err != nil {
+ return nil, err
+ }
+ options.decryption.MessageCrypto = messageCrypto
+ }
+ decryptor = cryptointernal.NewConsumerDecryptor(
+ options.decryption.KeyReader,
+ options.decryption.MessageCrypto,
+ pc.log,
+ )
+ }
+
+ pc.decryptor = decryptor
+
pc.nackTracker = newNegativeAcksTracker(pc,
options.nackRedeliveryDelay, pc.log)
err := pc.grabConn()
@@ -480,7 +505,54 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
return err
}
- uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta,
headersAndPayload)
+ decryptedPayload, err :=
pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+ // error decrypting the payload
+ if err != nil {
+ // default crypto failure action
+ crypToFailureAction := crypto.ConsumerCryptoFailureActionFail
+ if pc.options.decryption != nil {
+ crypToFailureAction =
pc.options.decryption.ConsumerCryptoFailureAction
+ }
+
+ switch crypToFailureAction {
+ case crypto.ConsumerCryptoFailureActionFail:
+ pc.log.Errorf("consuming message failed due to
decryption err :%v", err)
+
pc.NackID(newTrackingMessageID(int64(pbMsgID.GetLedgerId()),
int64(pbMsgID.GetEntryId()), 0, 0, nil))
+ return err
+ case crypto.ConsumerCryptoFailureActionDiscard:
+ pc.discardCorruptedMessage(pbMsgID,
pb.CommandAck_DecryptionError)
+ return fmt.Errorf("discarding message on decryption
error :%v", err)
+ case crypto.ConsumerCryptoFailureActionConsume:
+ pc.log.Warnf("consuming encrypted message due to error
in decryption :%v", err)
+ messages := []*message{
+ {
+ publishTime:
timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+ eventTime:
timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+ key: msgMeta.GetPartitionKey(),
+ producerName: msgMeta.GetProducerName(),
+ properties:
internal.ConvertToStringMap(msgMeta.GetProperties()),
+ topic: pc.topic,
+ msgID: newMessageID(
+ int64(pbMsgID.GetLedgerId()),
+ int64(pbMsgID.GetEntryId()),
+ pbMsgID.GetBatchIndex(),
+ pc.partitionIdx,
+ ),
+ payLoad:
headersAndPayload.ReadableSlice(),
+ schema: pc.options.schema,
+ replicationClusters:
msgMeta.GetReplicateTo(),
+ replicatedFrom:
msgMeta.GetReplicatedFrom(),
+ redeliveryCount:
response.GetRedeliveryCount(),
+ encryptionContext:
createEncryptionContext(msgMeta),
+ },
+ }
+ pc.queueCh <- messages
+ return nil
+ }
+ }
+
+ // decryption is success, decompress the payload
+ uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta,
internal.NewBufferWrapper(decryptedPayload))
if err != nil {
pc.discardCorruptedMessage(pbMsgID,
pb.CommandAck_DecompressionError)
return err
@@ -493,6 +565,7 @@ func (pc *partitionConsumer) MessageReceived(response
*pb.CommandMessage, header
if msgMeta.NumMessagesInBatch != nil {
numMsgs = int(msgMeta.GetNumMessagesInBatch())
}
+
messages := make([]*message, 0)
var ackTracker *ackTracker
// are there multiple messages in this batch?
@@ -590,6 +663,39 @@ func (pc *partitionConsumer)
messageShouldBeDiscarded(msgID trackingMessageID) b
return pc.startMessageID.greaterEqual(msgID.messageID)
}
+// create EncryptionContext from message metadata
+// this will be used to decrypt the message payload outside of this client
+// it is the responsibility of end user to decrypt the payload
+// It will be used only when crypto failure action is set to consume i.e
crypto.ConsumerCryptoFailureActionConsume
+func createEncryptionContext(msgMeta *pb.MessageMetadata) *EncryptionContext {
+ encCtx := EncryptionContext{
+ Algorithm: msgMeta.GetEncryptionAlgo(),
+ Param: msgMeta.GetEncryptionParam(),
+ UncompressedSize: int(msgMeta.GetUncompressedSize()),
+ BatchSize: int(msgMeta.GetNumMessagesInBatch()),
+ }
+
+ if msgMeta.Compression != nil {
+ encCtx.CompressionType = CompressionType(*msgMeta.Compression)
+ }
+
+ kMap := map[string]EncryptionKey{}
+ for _, k := range msgMeta.GetEncryptionKeys() {
+ metaMap := map[string]string{}
+ for _, m := range k.GetMetadata() {
+ metaMap[*m.Key] = *m.Value
+ }
+
+ kMap[*k.Key] = EncryptionKey{
+ KeyValue: k.GetValue(),
+ Metadata: metaMap,
+ }
+ }
+
+ encCtx.Keys = kMap
+ return &encCtx
+}
+
func (pc *partitionConsumer) ConnectionClosed() {
// Trigger reconnection in the consumer goroutine
pc.log.Debug("connection closed and send to connectClosedCh")
diff --git a/pulsar/consumer_partition_test.go
b/pulsar/consumer_partition_test.go
index b1322e3..560afb6 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -21,6 +21,7 @@ import (
"testing"
"github.com/apache/pulsar-client-go/pulsar/internal/compression"
+ "github.com/apache/pulsar-client-go/pulsar/internal/crypto"
pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
"github.com/stretchr/testify/assert"
@@ -36,6 +37,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
compressionProviders:
make(map[pb.CompressionType]compression.Provider),
options: &partitionConsumerOpts{},
metrics:
internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"),
+ decryptor: crypto.NewNoopDecryptor(),
}
headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
@@ -67,6 +69,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
compressionProviders:
make(map[pb.CompressionType]compression.Provider),
options: &partitionConsumerOpts{},
metrics:
internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"),
+ decryptor: crypto.NewNoopDecryptor(),
}
headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
@@ -98,6 +101,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
compressionProviders:
make(map[pb.CompressionType]compression.Provider),
options: &partitionConsumerOpts{},
metrics:
internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"),
+ decryptor: crypto.NewNoopDecryptor(),
}
headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 66587f9..55823e4 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -20,6 +20,7 @@ package pulsar
import (
"context"
"fmt"
+ "io/ioutil"
"log"
"net/http"
"strconv"
@@ -27,8 +28,13 @@ import (
"testing"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/apache/pulsar-client-go/pulsar/internal"
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+ plog "github.com/apache/pulsar-client-go/pulsar/log"
+ "github.com/gogo/protobuf/proto"
"github.com/google/uuid"
+ "github.com/pierrec/lz4"
"github.com/stretchr/testify/assert"
)
@@ -92,7 +98,6 @@ func TestProducerConsumer(t *testing.T) {
assert.Equal(t, []byte(expectMsg), msg.Payload())
assert.Equal(t, "pulsar", msg.Key())
assert.Equal(t, expectProperties, msg.Properties())
-
// ack message
consumer.Ack(msg)
}
@@ -2104,3 +2109,843 @@ func TestConsumerKeySharedWithOrderingKey(t *testing.T)
{
)
assert.Equal(t, 100, receivedConsumer1+receivedConsumer2)
}
+
+func TestProducerConsumerRSAEncryption(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond())
+
+ cryptoConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Decryption: &MessageDecryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ ConsumerCryptoFailureAction:
crypto.ConsumerCryptoFailureActionFail,
+ },
+ SubscriptionName: "crypto-subscription",
+ Schema: NewStringSchema(nil),
+ })
+
+ assert.Nil(t, err)
+
+ normalConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "normal-subscription",
+ Schema: NewStringSchema(nil),
+ })
+
+ assert.Nil(t, err)
+
+ cryptoProducer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Encryption: &ProducerEncryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ Keys: []string{"client-rsa.pem"},
+ },
+ Schema: NewStringSchema(nil),
+ })
+
+ assert.Nil(t, err)
+
+ msgFormat := "my-message-%v"
+
+ totalMessages := 10
+
+ ctx := context.Background()
+
+ for i := 0; i < totalMessages; i++ {
+ _, err := cryptoProducer.Send(ctx, &ProducerMessage{
+ Value: fmt.Sprintf(msgFormat, i),
+ })
+
+ assert.Nil(t, err)
+ }
+
+ // try to consume with normal consumer
+ normalConsumerCtx, cancel := context.WithTimeout(context.Background(),
500*time.Millisecond)
+ defer cancel()
+
+ msg, err := normalConsumer.Receive(normalConsumerCtx)
+ // msg should be null as the consumer will not be able to decrypt
+ assert.NotNil(t, err)
+ assert.Nil(t, msg)
+
+ // try to consume the message by crypto consumer
+ // consumer should be able to read all the messages
+ var actualMessage *string
+ for i := 0; i < totalMessages; i++ {
+ msg, err := cryptoConsumer.Receive(ctx)
+ fmt.Println(msg)
+ assert.Nil(t, err)
+ expectedMsg := fmt.Sprintf(msgFormat, i)
+ err = msg.GetSchemaValue(&actualMessage)
+ assert.Nil(t, err)
+ assert.Equal(t, expectedMsg, *actualMessage)
+ cryptoConsumer.Ack(msg)
+ }
+}
+
+func TestProducerConsumerRSAEncryptionWithCompression(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond())
+
+ cryptoConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Decryption: &MessageDecryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ },
+ SubscriptionName: "crypto-subscription",
+ Schema: NewStringSchema(nil),
+ })
+
+ assert.Nil(t, err)
+
+ normalConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "normal-subscription",
+ Schema: NewStringSchema(nil),
+ })
+
+ assert.Nil(t, err)
+
+ cryptoProducer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Encryption: &ProducerEncryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ Keys: []string{"client-rsa.pem"},
+ },
+ Schema: NewStringSchema(nil),
+ CompressionType: LZ4,
+ })
+
+ assert.Nil(t, err)
+
+ msgFormat := "my-message-%v"
+
+ totalMessages := 10
+
+ ctx := context.Background()
+
+ for i := 0; i < totalMessages; i++ {
+ _, err := cryptoProducer.Send(ctx, &ProducerMessage{
+ Value: fmt.Sprintf(msgFormat, i),
+ })
+
+ assert.Nil(t, err)
+ }
+
+ // try to consume with normal consumer
+ normalConsumerCtx, cancel := context.WithTimeout(context.Background(),
500*time.Millisecond)
+ defer cancel()
+
+ msg, err := normalConsumer.Receive(normalConsumerCtx)
+ // msg should be null as the consumer will not be able to decrypt
+ assert.NotNil(t, err)
+ assert.Nil(t, msg)
+
+ // try to consume the message by crypto consumer
+ // consumer should be able to read all the messages
+ var actualMessage *string
+ for i := 0; i < totalMessages; i++ {
+ msg, err := cryptoConsumer.Receive(ctx)
+ assert.Nil(t, err)
+ expectedMsg := fmt.Sprintf(msgFormat, i)
+ err = msg.GetSchemaValue(&actualMessage)
+ assert.Nil(t, err)
+ assert.Equal(t, expectedMsg, *actualMessage)
+ cryptoConsumer.Ack(msg)
+ }
+}
+
+func TestBatchProducerConsumerRSAEncryptionWithCompression(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond())
+
+ cryptoConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ Decryption: &MessageDecryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ },
+ SubscriptionName: "crypto-subscription",
+ Schema: NewStringSchema(nil),
+ })
+
+ assert.Nil(t, err)
+
+ normalConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "normal-subscription",
+ Schema: NewStringSchema(nil),
+ })
+
+ assert.Nil(t, err)
+ batchSize := 2
+ cryptoProducer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Encryption: &ProducerEncryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ Keys: []string{"client-rsa.pem"},
+ },
+ Schema: NewStringSchema(nil),
+ CompressionType: LZ4,
+ DisableBatching: false,
+ BatchingMaxMessages: uint(batchSize),
+ })
+
+ assert.Nil(t, err)
+
+ msgFormat := "my-message-%v"
+
+ totalMessages := 10
+
+ ctx := context.Background()
+
+ for i := 0; i < totalMessages; i++ {
+ _, err := cryptoProducer.Send(ctx, &ProducerMessage{
+ Value: fmt.Sprintf(msgFormat, i),
+ })
+
+ assert.Nil(t, err)
+ }
+
+ // try to consume with normal consumer
+ normalConsumerCtx, cancel := context.WithTimeout(context.Background(),
500*time.Millisecond)
+ defer cancel()
+
+ msg, err := normalConsumer.Receive(normalConsumerCtx)
+ // msg should be null as the consumer will not be able to decrypt
+ assert.NotNil(t, err)
+ assert.Nil(t, msg)
+
+ // try to consume the message by crypto consumer
+ // consumer should be able to read all the messages
+ var actualMessage *string
+ for i := 0; i < totalMessages; i++ {
+ msg, err := cryptoConsumer.Receive(ctx)
+ assert.Nil(t, err)
+ expectedMsg := fmt.Sprintf(msgFormat, i)
+ err = msg.GetSchemaValue(&actualMessage)
+ assert.Nil(t, err)
+ assert.Equal(t, expectedMsg, *actualMessage)
+ cryptoConsumer.Ack(msg)
+ }
+}
+
+func TestProducerConsumerRedeliveryOfFailedEncryptedMessages(t *testing.T) {
+ // create new client instance for each producer and consumer
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ clientCryptoConsumer, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.Nil(t, err)
+ defer clientCryptoConsumer.Close()
+
+ clientCryptoConsumerInvalidKeyReader, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.Nil(t, err)
+ defer clientCryptoConsumerInvalidKeyReader.Close()
+
+ clientcryptoConsumerNoKeyReader, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.Nil(t, err)
+ defer clientcryptoConsumerNoKeyReader.Close()
+
+ topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond())
+
+ cryptoProducer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Encryption: &ProducerEncryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ Keys: []string{"client-rsa.pem"},
+ },
+ CompressionType: LZ4,
+ Schema: NewStringSchema(nil),
+ })
+ assert.Nil(t, err)
+
+ sharedSubscription := "crypto-shared-subscription"
+
+ cryptoConsumer, err := clientCryptoConsumer.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sharedSubscription,
+ Decryption: &MessageDecryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ },
+ Schema: NewStringSchema(nil),
+ Type: Shared,
+ NackRedeliveryDelay: 1 * time.Second,
+ })
+ assert.Nil(t, err)
+
+ cryptoConsumerInvalidKeyReader, err :=
clientCryptoConsumerInvalidKeyReader.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sharedSubscription,
+ Decryption: &MessageDecryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa_invalid.pem"),
+ },
+ Schema: NewStringSchema(nil),
+ Type: Shared,
+ NackRedeliveryDelay: 1 * time.Second,
+ })
+ assert.Nil(t, err)
+
+ cryptoConsumerNoKeyReader, err :=
clientcryptoConsumerNoKeyReader.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: sharedSubscription,
+ Schema: NewStringSchema(nil),
+ Type: Shared,
+ NackRedeliveryDelay: 1 * time.Second,
+ })
+ assert.Nil(t, err)
+
+ totalMessages := 5
+ message := "my-message-%v"
+ // since messages can be in random order
+ // map can be used to check if all the messages are received
+ messageMap := map[string]struct{}{}
+
+ // producer messages
+ for i := 0; i < totalMessages; i++ {
+ mid, err := cryptoProducer.Send(context.Background(),
&ProducerMessage{
+ Value: fmt.Sprintf(message, i),
+ })
+ assert.Nil(t, err)
+ fmt.Printf("Sent : %v\n", mid)
+ }
+
+ // Consuming from consumer 2 and 3
+ // no message should be returned since they can't decrypt the message
+ ctxWithTimeOut1, c1 := context.WithTimeout(context.Background(),
2*time.Second)
+ defer c1()
+
+ ctxWithTimeOut2, c2 := context.WithTimeout(context.Background(),
2*time.Second)
+ defer c2()
+
+ // try to consume messages
+ msg, err := cryptoConsumerInvalidKeyReader.Receive(ctxWithTimeOut1)
+ assert.NotNil(t, err)
+ assert.Nil(t, msg)
+
+ msg, err = cryptoConsumerNoKeyReader.Receive(ctxWithTimeOut2)
+ assert.NotNil(t, err)
+ assert.Nil(t, msg)
+
+ cryptoConsumerInvalidKeyReader.Close()
+ cryptoConsumerNoKeyReader.Close()
+
+ // try to consume by consumer1
+ // all the messages would by received by it
+ var receivedMsg *string
+ for i := 0; i < totalMessages; i++ {
+ m, err := cryptoConsumer.Receive(context.Background())
+ assert.Nil(t, err)
+ err = m.GetSchemaValue(&receivedMsg)
+ assert.Nil(t, err)
+ messageMap[*receivedMsg] = struct{}{}
+ cryptoConsumer.Ack(m)
+ fmt.Printf("Received : %v\n", m.ID())
+ }
+
+ // check if all messages were received
+ for i := 0; i < totalMessages; i++ {
+ key := fmt.Sprintf(message, i)
+ _, ok := messageMap[key]
+ assert.True(t, ok)
+ }
+}
+
+func TestRSAEncryptionFailure(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.Nil(t, err)
+ client.Close()
+
+ topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond())
+
+ // 1. invalid key name
+ // create producer with invalid key
+ // producer creation succeeds but message sending should fail with an
error
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Encryption: &ProducerEncryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa_invalid.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ Keys: []string{"client-rsa.pem"},
+ },
+ Schema: NewStringSchema(nil),
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+ assert.NotNil(t, producer)
+
+ // sending of message should fail with an error, since invalid rsa keys
are configured
+ mid, err := producer.Send(context.Background(), &ProducerMessage{
+ Value: "some-message",
+ })
+
+ assert.Nil(t, mid)
+ assert.NotNil(t, err)
+ producer.Close()
+
+ // 2. Producer with valid key name
+ producer, err = client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Encryption: &ProducerEncryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ Keys: []string{"client-rsa.pem"},
+ },
+ Schema: NewStringSchema(nil),
+ DisableBatching: true,
+ })
+ assert.Nil(t, err)
+ assert.NotNil(t, producer)
+
+ subscriptionName := "enc-failure-subcription"
+ totalMessages := 10
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: subscriptionName,
+ })
+ assert.Nil(t, err)
+
+ messageFormat := "my-message-%v"
+ for i := 0; i < totalMessages; i++ {
+ _, err := producer.Send(context.Background(), &ProducerMessage{
+ Value: fmt.Sprintf(messageFormat, i),
+ })
+ assert.Nil(t, err)
+ }
+
+ // 3. KeyReader is not set by the consumer
+ // Receive should fail since KeyReader is not setup
+ // because default behaviour of consumer is fail receiving message if
error in decryption
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+ defer cancel()
+
+ msg, err := consumer.Receive(ctx)
+ assert.NotNil(t, err)
+ assert.Nil(t, msg, "Receive should have failed with no keyreader")
+
+ // 4. Set consumer config to consume even if decryption fails
+ consumer.Close()
+ consumer, err = client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: subscriptionName,
+ Decryption: &MessageDecryptionInfo{
+ ConsumerCryptoFailureAction:
crypto.ConsumerCryptoFailureActionConsume,
+ },
+ Schema: NewStringSchema(nil),
+ })
+ assert.Nil(t, err)
+ assert.NotNil(t, consumer)
+
+ ctx2, cancel2 := context.WithTimeout(context.Background(),
2*time.Second)
+ defer cancel2()
+
+ for i := 0; i < totalMessages-1; i++ {
+ expectedMessage := fmt.Sprintf(messageFormat, i)
+ msg, err = consumer.Receive(ctx2)
+ assert.Nil(t, err)
+ assert.NotNil(t, msg)
+
+ receivedMsg := string(msg.Payload())
+ assert.NotEqual(t, expectedMessage, receivedMsg,
fmt.Sprintf(`Received encrypted message [%v]
+ should not match the expected message [%v]`, expectedMessage,
receivedMsg))
+ // verify the message contains Encryption context
+ assert.NotEmpty(t, msg.GetEncryptionContext(),
+ "Encrypted message which is failed to decrypt must
contain EncryptionContext")
+ consumer.Ack(msg)
+ }
+
+ // 5. discard action on decryption failure
+ consumer.Close()
+ consumer, err = client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: subscriptionName,
+ Decryption: &MessageDecryptionInfo{
+ ConsumerCryptoFailureAction:
crypto.ConsumerCryptoFailureActionDiscard,
+ },
+ Schema: NewStringSchema(nil),
+ })
+ assert.Nil(t, err)
+ assert.NotNil(t, consumer)
+
+ ctx3, cancel3 := context.WithTimeout(context.Background(),
3*time.Second)
+ defer cancel3()
+
+ msg, err = consumer.Receive(ctx3)
+ assert.NotNil(t, err)
+ assert.Nil(t, msg, "Message received even aftet
ConsumerCryptoFailureAction.Discard is set.")
+}
+
+func TestConsumerCompressionWithRSAEncryption(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := newTopicName()
+ ctx := context.Background()
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ CompressionType: LZ4,
+ Encryption: &ProducerEncryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ Keys: []string{"enc-compress-app.key"},
+ },
+ })
+
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: "sub-1",
+ Decryption: &MessageDecryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ },
+ })
+
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ const N = 100
+
+ for i := 0; i < N; i++ {
+ if _, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
+ }); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ for i := 0; i < N; i++ {
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, err)
+ assert.Equal(t, fmt.Sprintf("msg-content-%d", i),
string(msg.Payload()))
+ consumer.Ack(msg)
+ }
+}
+
+func TestBatchMessageReceiveWithCompressionAndRSAEcnryption(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topicName := "persistent://public/default/receive-batch-comp-enc"
+ subName := "subscription-name"
+ prefix := "msg-batch-"
+ ctx := context.Background()
+
+ // Enable batching on producer side
+ batchSize, numOfMessages := 2, 100
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topicName,
+ BatchingMaxMessages: uint(batchSize),
+ DisableBatching: false,
+ CompressionType: LZ4,
+ Encryption: &ProducerEncryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ Keys: []string{"batch-encryption-app.key"},
+ },
+ })
+ assert.Nil(t, err)
+ assert.Equal(t, topicName, producer.Topic())
+ defer producer.Close()
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topicName,
+ SubscriptionName: subName,
+ Decryption: &MessageDecryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ },
+ })
+
+ assert.Nil(t, err)
+ defer consumer.Close()
+
+ count := 0
+ for i := 0; i < numOfMessages; i++ {
+ messageContent := prefix + fmt.Sprintf("%d", i)
+ msg := &ProducerMessage{
+ Payload: []byte(messageContent),
+ }
+ _, err := producer.Send(ctx, msg)
+ assert.Nil(t, err)
+ }
+
+ for i := 0; i < numOfMessages; i++ {
+ msg, err := consumer.Receive(ctx)
+ fmt.Printf("received : %v\n", string(msg.Payload()))
+ assert.Nil(t, err)
+ consumer.Ack(msg)
+ count++
+ }
+
+ assert.Equal(t, count, numOfMessages)
+}
+
+type EncKeyReader struct {
+ publicKeyPath string
+ privateKeyPath string
+ metaMap map[string]string
+}
+
+func NewEncKeyReader(publicKeyPath, privateKeyPath string) *EncKeyReader {
+ metaMap := map[string]string{
+ "version": "1.0",
+ }
+
+ return &EncKeyReader{
+ publicKeyPath: publicKeyPath,
+ privateKeyPath: privateKeyPath,
+ metaMap: metaMap,
+ }
+}
+
+// GetPublicKey read public key from the given path
+func (d *EncKeyReader) PublicKey(keyName string, keyMeta map[string]string)
(*crypto.EncryptionKeyInfo, error) {
+ return readKey(keyName, d.publicKeyPath, d.metaMap)
+}
+
+// GetPrivateKey read private key from the given path
+func (d *EncKeyReader) PrivateKey(keyName string, keyMeta map[string]string)
(*crypto.EncryptionKeyInfo, error) {
+ return readKey(keyName, d.privateKeyPath, d.metaMap)
+}
+
+func readKey(keyName, path string, keyMeta map[string]string)
(*crypto.EncryptionKeyInfo, error) {
+ key, err := ioutil.ReadFile(path)
+ if err != nil {
+ return nil, err
+ }
+ return crypto.NewEncryptionKeyInfo(keyName, key, keyMeta), nil
+}
+
+func TestConsumerEncryptionWithoutKeyReader(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ encryptionKeyName := "client-rsa.pem"
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Encryption: &ProducerEncryptionInfo{
+ KeyReader:
NewEncKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ Keys: []string{encryptionKeyName},
+ },
+ CompressionType: LZ4,
+ Schema: NewStringSchema(nil),
+ })
+
+ assert.Nil(t, err)
+ assert.NotNil(t, producer)
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: "my-subscription-name",
+ Decryption: &MessageDecryptionInfo{
+ ConsumerCryptoFailureAction:
crypto.ConsumerCryptoFailureActionConsume,
+ },
+ Schema: NewStringSchema(nil),
+ })
+ assert.Nil(t, err)
+
+ message := "my-message"
+
+ _, err = producer.Send(context.Background(), &ProducerMessage{
+ Value: message,
+ })
+ assert.Nil(t, err)
+
+ // consume encrypted message
+ msg, err := consumer.Receive(context.Background())
+ assert.Nil(t, err)
+ assert.NotNil(t, msg)
+
+ // try to decrypt message
+ encCtx := msg.GetEncryptionContext()
+ assert.NotEmpty(t, encCtx)
+
+ keys := encCtx.Keys
+ assert.Equal(t, 1, len(keys))
+
+ encryptionKey, ok := keys[encryptionKeyName]
+ assert.True(t, ok)
+
+ encDataKey := encryptionKey.KeyValue
+ assert.NotNil(t, encDataKey)
+
+ metadata := encryptionKey.Metadata
+ assert.NotNil(t, metadata)
+
+ version := metadata["version"]
+ assert.Equal(t, "1.0", version)
+
+ compressionType := encCtx.CompressionType
+ uncompressedSize := uint32(encCtx.UncompressedSize)
+ encParam := encCtx.Param
+ encAlgo := encCtx.Algorithm
+ batchSize := encCtx.BatchSize
+
+ // try to decrypt using default MessageCrypto
+ msgCrypto, err := crypto.NewDefaultMessageCrypto("testing", false,
plog.DefaultNopLogger())
+ assert.Nil(t, err)
+
+ producerName := "test"
+ sequenceID := uint64(123)
+ publishTime := uint64(12333453454)
+
+ messageMetaData := pb.MessageMetadata{
+ EncryptionParam: encParam,
+ ProducerName: &producerName,
+ SequenceId: &sequenceID,
+ PublishTime: &publishTime,
+ UncompressedSize: &uncompressedSize,
+ EncryptionAlgo: &encAlgo,
+ }
+
+ if compressionType == LZ4 {
+ messageMetaData.Compression = pb.CompressionType_LZ4.Enum()
+ }
+
+ messageMetaData.EncryptionKeys = []*pb.EncryptionKeys{{
+ Key: &encryptionKeyName,
+ Value: encDataKey,
+ }}
+
+ decryptedPayload, err :=
msgCrypto.Decrypt(crypto.NewMessageMetadataSupplier(&messageMetaData),
+ msg.Payload(),
+ NewEncKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"))
+ assert.Nil(t, err)
+ assert.NotNil(t, decryptedPayload)
+
+ // try to uncompress payload
+ uncompressedPayload := make([]byte, uncompressedSize)
+ s, err := lz4.UncompressBlock(decryptedPayload, uncompressedPayload)
+ assert.Nil(t, err)
+ assert.Equal(t, uncompressedSize, uint32(s))
+
+ buffer := internal.NewBufferWrapper(uncompressedPayload)
+
+ if batchSize > 0 {
+ size := buffer.ReadUint32()
+ var meta pb.SingleMessageMetadata
+ if err := proto.Unmarshal(buffer.Read(size), &meta); err != nil
{
+ fmt.Println(err)
+ }
+ d := buffer.Read(uint32(meta.GetPayloadSize()))
+ assert.Equal(t, message, string(d))
+ }
+}
+
+// TestEncryptDecryptRedeliveryOnFailure test redelivery failed messages
+func TestEncryptDecryptRedeliveryOnFailure(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ })
+ assert.Nil(t, err)
+
+ topic := newTopicName()
+ subcription := "test-subscription-redelivery"
+
+ consumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: subcription,
+ Decryption: &MessageDecryptionInfo{
+ KeyReader:
NewEncKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_invalid_rsa.pem"),
+ },
+ })
+ assert.Nil(t, err)
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Encryption: &ProducerEncryptionInfo{
+ KeyReader:
NewEncKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ Keys: []string{"new-enc-key"},
+ },
+ })
+ assert.Nil(t, err)
+
+ producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte("new-test-message"),
+ })
+
+ ctx, cancel := context.WithTimeout(context.Background(),
1000*time.Millisecond)
+ defer cancel()
+
+ // message receive should fail due to decryption error
+ msg, err := consumer.Receive(ctx)
+ assert.Nil(t, msg)
+ assert.NotNil(t, err)
+
+ consumer.Close()
+
+ // create consumer with same subscription and proper rsa key pairs
+ consumer, err = client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: subcription,
+ Decryption: &MessageDecryptionInfo{
+ KeyReader:
NewEncKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ },
+ })
+ assert.Nil(t, err)
+
+ // previous message should be redelivered
+ msg, err = consumer.Receive(context.Background())
+ assert.Nil(t, err)
+ assert.NotNil(t, msg)
+ consumer.Ack(msg)
+}
diff --git a/pulsar/encryption.go b/pulsar/encryption.go
index aade2ca..3ab2527 100644
--- a/pulsar/encryption.go
+++ b/pulsar/encryption.go
@@ -34,3 +34,15 @@ type ProducerEncryptionInfo struct {
// default is ProducerCryptoFailureActionFail
ProducerCryptoFailureAction int
}
+
+// MessageDecryptionInfo encryption related fields required by the consumer to
decrypt the message
+type MessageDecryptionInfo struct {
+ // KeyReader read RSA public/private key pairs
+ KeyReader crypto.KeyReader
+
+ // MessageCrypto used to encrypt and decrypt the data and session keys
+ MessageCrypto crypto.MessageCrypto
+
+ // ConsumerCryptoFailureAction action to be taken on failure of message
decryption
+ ConsumerCryptoFailureAction int
+}
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index c4f215c..19fa6d8 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -201,6 +201,24 @@ func timeFromUnixTimestampMillis(timestamp uint64)
time.Time {
return time.Unix(seconds, nanos)
}
+// EncryptionContext
+// It will be used to decrypt message outside of this client
+type EncryptionContext struct {
+ Keys map[string]EncryptionKey
+ Param []byte
+ Algorithm string
+ CompressionType CompressionType
+ UncompressedSize int
+ BatchSize int
+}
+
+// EncryptionKey
+// Encryption key used to encrypt the message payload
+type EncryptionKey struct {
+ KeyValue []byte
+ Metadata map[string]string
+}
+
type message struct {
publishTime time.Time
eventTime time.Time
@@ -215,6 +233,7 @@ type message struct {
replicatedFrom string
redeliveryCount uint32
schema Schema
+ encryptionContext *EncryptionContext
}
func (msg *message) Topic() string {
@@ -269,6 +288,10 @@ func (msg *message) ProducerName() string {
return msg.producerName
}
+func (msg *message) GetEncryptionContext() *EncryptionContext {
+ return msg.encryptionContext
+}
+
func newAckTracker(size int) *ackTracker {
var batchIDs *big.Int
if size <= 64 {
diff --git a/pulsar/internal/crypto/consumer_decryptor.go
b/pulsar/internal/crypto/consumer_decryptor.go
new file mode 100644
index 0000000..bbc1f9b
--- /dev/null
+++ b/pulsar/internal/crypto/consumer_decryptor.go
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+ "fmt"
+
+ "github.com/apache/pulsar-client-go/pulsar/crypto"
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+ "github.com/apache/pulsar-client-go/pulsar/log"
+)
+
+type consumerDecryptor struct {
+ keyReader crypto.KeyReader
+ messageCrypto crypto.MessageCrypto
+ logger log.Logger
+}
+
+func NewConsumerDecryptor(keyReader crypto.KeyReader,
+ messageCrypto crypto.MessageCrypto,
+ logger log.Logger) Decryptor {
+ return &consumerDecryptor{
+ keyReader: keyReader,
+ messageCrypto: messageCrypto,
+ logger: logger,
+ }
+}
+
+func (d *consumerDecryptor) Decrypt(payload []byte,
+ msgID *pb.MessageIdData,
+ msgMetadata *pb.MessageMetadata) ([]byte, error) {
+ // encryption keys are not present in message metadta, no need decrypt
the payload
+ if len(msgMetadata.GetEncryptionKeys()) == 0 {
+ return payload, nil
+ }
+
+ // KeyReader interface is not implemented
+ if d.keyReader == nil {
+ return payload, fmt.Errorf("KeyReader interface is not
implemented")
+ }
+
+ return
d.messageCrypto.Decrypt(crypto.NewMessageMetadataSupplier(msgMetadata),
+ payload,
+ d.keyReader)
+}
diff --git a/pulsar/encryption.go b/pulsar/internal/crypto/decryptor.go
similarity index 56%
copy from pulsar/encryption.go
copy to pulsar/internal/crypto/decryptor.go
index aade2ca..da67d5c 100644
--- a/pulsar/encryption.go
+++ b/pulsar/internal/crypto/decryptor.go
@@ -15,22 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-package pulsar
+package crypto
-import "github.com/apache/pulsar-client-go/pulsar/crypto"
+import (
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+)
-// ProducerEncryptionInfo encryption related fields required by the producer
-type ProducerEncryptionInfo struct {
- // KeyReader read RSA public/private key pairs
- KeyReader crypto.KeyReader
-
- // MessageCrypto used to encrypt and decrypt the data and session keys
- MessageCrypto crypto.MessageCrypto
-
- // Keys list of encryption key names to encrypt session key
- Keys []string
-
- // ProducerCryptoFailureAction action to be taken on failure of message
encryption
- // default is ProducerCryptoFailureActionFail
- ProducerCryptoFailureAction int
+// Decryptor support decrypting of message
+type Decryptor interface {
+ Decrypt(payload []byte, msgID *pb.MessageIdData, msgMetadata
*pb.MessageMetadata) ([]byte, error)
}
diff --git a/pulsar/encryption.go b/pulsar/internal/crypto/noop_decryptor.go
similarity index 56%
copy from pulsar/encryption.go
copy to pulsar/internal/crypto/noop_decryptor.go
index aade2ca..c049c47 100644
--- a/pulsar/encryption.go
+++ b/pulsar/internal/crypto/noop_decryptor.go
@@ -15,22 +15,26 @@
// specific language governing permissions and limitations
// under the License.
-package pulsar
+package crypto
-import "github.com/apache/pulsar-client-go/pulsar/crypto"
+import (
+ "fmt"
-// ProducerEncryptionInfo encryption related fields required by the producer
-type ProducerEncryptionInfo struct {
- // KeyReader read RSA public/private key pairs
- KeyReader crypto.KeyReader
+ pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+)
- // MessageCrypto used to encrypt and decrypt the data and session keys
- MessageCrypto crypto.MessageCrypto
+type noopDecryptor struct{}
- // Keys list of encryption key names to encrypt session key
- Keys []string
+func NewNoopDecryptor() Decryptor {
+ return &noopDecryptor{}
+}
- // ProducerCryptoFailureAction action to be taken on failure of message
encryption
- // default is ProducerCryptoFailureActionFail
- ProducerCryptoFailureAction int
+// Decrypt noop decryptor
+func (d *noopDecryptor) Decrypt(payload []byte,
+ msgID *pb.MessageIdData,
+ msgMetadata *pb.MessageMetadata) ([]byte, error) {
+ if len(msgMetadata.GetEncryptionKeys()) > 0 {
+ return payload, fmt.Errorf("incoming message payload is
encrypted, consumer is not configured to decrypt")
+ }
+ return payload, nil
}
diff --git a/pulsar/internal/pulsartracing/message_carrier_util_test.go
b/pulsar/internal/pulsartracing/message_carrier_util_test.go
index 9fe608d..677a7ff 100644
--- a/pulsar/internal/pulsartracing/message_carrier_util_test.go
+++ b/pulsar/internal/pulsartracing/message_carrier_util_test.go
@@ -119,3 +119,7 @@ func (msg *mockConsumerMessage) GetSchemaValue(v
interface{}) error {
func (msg *mockConsumerMessage) ProducerName() string {
return ""
}
+
+func (msg *mockConsumerMessage) GetEncryptionContext()
*pulsar.EncryptionContext {
+ return &pulsar.EncryptionContext{}
+}
diff --git a/pulsar/message.go b/pulsar/message.go
index 23dfefb..1003bb1 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -116,6 +116,10 @@ type Message interface {
//Get the de-serialized value of the message, according the configured
GetSchemaValue(v interface{}) error
+
+ // GetEncryptionContext get the ecryption context of message
+ // It will be used by the application to parse undecrypted message
+ GetEncryptionContext() *EncryptionContext
}
// MessageID identifier for a particular message
diff --git a/pulsar/reader.go b/pulsar/reader.go
index 40234aa..7af3470 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -76,6 +76,9 @@ type ReaderOptions struct {
// ReadCompacted can only be enabled when reading from a persistent
topic. Attempting to enable it on non-persistent
// topics will lead to the reader create call throwing a
PulsarClientException.
ReadCompacted bool
+
+ // Decryption decryption related fields to decrypt the encrypted message
+ Decryption *MessageDecryptionInfo
}
// Reader can be used to scan through all the messages currently available in
a topic.
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 9983286..0e63f8b 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -89,6 +89,7 @@ func newReader(client *client, options ReaderOptions)
(Reader, error) {
metadata: options.Properties,
nackRedeliveryDelay: defaultNackRedeliveryDelay,
replicateSubscriptionState: false,
+ decryption: options.Decryption,
}
reader := &reader{
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 618f5ab..bdafea0 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -23,6 +23,7 @@ import (
"testing"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/crypto"
"github.com/stretchr/testify/assert"
)
@@ -654,3 +655,58 @@ func TestReaderWithMultiHosts(t *testing.T) {
assert.Equal(t, 10, i)
}
+
+func TestProducerReaderRSAEncryption(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: lookupURL,
+ })
+
+ assert.Nil(t, err)
+ defer client.Close()
+
+ topic := newTopicName()
+ ctx := context.Background()
+
+ // create reader
+ reader, err := client.CreateReader(ReaderOptions{
+ Topic: topic,
+ StartMessageID: EarliestMessageID(),
+ Decryption: &MessageDecryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ ConsumerCryptoFailureAction:
crypto.ConsumerCryptoFailureActionFail,
+ },
+ })
+ assert.Nil(t, err)
+ defer reader.Close()
+
+ // create producer
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: topic,
+ Encryption: &ProducerEncryptionInfo{
+ KeyReader:
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+ "crypto/testdata/pri_key_rsa.pem"),
+ ProducerCryptoFailureAction:
crypto.ProducerCryptoFailureActionFail,
+ Keys: []string{"client-rsa.pem"},
+ },
+ })
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ // send 10 messages
+ for i := 0; i < 10; i++ {
+ _, err := producer.Send(ctx, &ProducerMessage{
+ Payload: []byte(fmt.Sprintf("hello-%d", i)),
+ })
+ assert.NoError(t, err)
+ }
+
+ // receive 10 messages
+ for i := 0; i < 10; i++ {
+ msg, err := reader.Next(context.Background())
+ assert.NoError(t, err)
+
+ expectMsg := fmt.Sprintf("hello-%d", i)
+ assert.Equal(t, []byte(expectMsg), msg.Payload())
+ }
+}