This is an automated email from the ASF dual-hosted git repository.

cckellogg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d3a9cc  Encryption support  producer (#560)
1d3a9cc is described below

commit 1d3a9cc80e63e67a9769d351539ac111c55e56ae
Author: Garule Prabhudas <[email protected]>
AuthorDate: Fri Sep 3 21:37:24 2021 +0530

    Encryption support  producer (#560)
    
    * add ability to encrypt messages
    - use base crypto package for encryption
    
    * fix typo
    
    * lint fixes
    
    * address review suggestions
    
    * revert go mod
    
    * remove encryption context
     - move it to Consumer MR
    
    * try to fix check issues
    
    * remove unused code
    
    * remove embedded crypto struct
    
    * review suggestions
    
    * remove duplicate log
    
    * lint code style issue fix
    
    * return error from flush methods on serialization error
    
    * update test case and do lazy data key generation
    
    * address review changes
    
    * add comments on test case
    
    Co-authored-by: PGarule <[email protected]>
---
 go.mod                                       |   2 -
 go.sum                                       |  15 ----
 pulsar/encryption.go                         |  36 +++++++++
 pulsar/internal/batch_builder.go             |  34 +++++----
 pulsar/internal/commands.go                  |  26 +++++--
 pulsar/internal/crypto/encryptor.go          |  27 +++++++
 pulsar/internal/crypto/noop_encryptor.go     |  33 ++++++++
 pulsar/internal/crypto/producer_encryptor.go |  73 ++++++++++++++++++
 pulsar/internal/key_based_batch_builder.go   |  19 +++--
 pulsar/producer.go                           |   3 +
 pulsar/producer_partition.go                 |  64 +++++++++++++++-
 pulsar/producer_test.go                      | 109 +++++++++++++++++++++++++++
 12 files changed, 390 insertions(+), 51 deletions(-)

diff --git a/go.mod b/go.mod
index 67a7bb7..354f5b4 100644
--- a/go.mod
+++ b/go.mod
@@ -14,7 +14,6 @@ require (
        github.com/google/uuid v1.1.2
        github.com/inconshreveable/mousetrap v1.0.0 // indirect
        github.com/klauspost/compress v1.10.8
-       github.com/kr/pretty v0.2.0 // indirect
        github.com/linkedin/goavro/v2 v2.9.8
        github.com/opentracing/opentracing-go v1.2.0
        github.com/pierrec/lz4 v2.0.5+incompatible
@@ -27,7 +26,6 @@ require (
        github.com/stretchr/testify v1.5.1
        go.uber.org/atomic v1.7.0
        golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
-       gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
 )
 
 replace github.com/apache/pulsar-client-go/oauth2 => ./oauth2
diff --git a/go.sum b/go.sum
index 2ccd39e..8b372c7 100644
--- a/go.sum
+++ b/go.sum
@@ -22,7 +22,6 @@ github.com/beorn7/perks v1.0.1 
h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
 github.com/beorn7/perks v1.0.1/go.mod 
h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b 
h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE=
 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod 
h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
-github.com/boynton/repl v0.0.0-20170116235056-348863958e3e/go.mod 
h1:Crc/GCZ3NXDVCio7Yr0o+SSrytpcFhLmVCIzi0s49t4=
 github.com/cespare/xxhash/v2 v2.1.1 
h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
 github.com/cespare/xxhash/v2 v2.1.1/go.mod 
h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
 github.com/danieljoos/wincred v1.0.2 
h1:zf4bhty2iLuwgjgpraD2E9UbvO+fe54XXGJbOwe23fU=
@@ -32,8 +31,6 @@ github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/dgrijalva/jwt-go v3.2.0+incompatible 
h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
 github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod 
h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
-github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1 
h1:CaO/zOnF8VvUfEbhRatPcwKVWamvbYd8tQGRWacE9kU=
-github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1/go.mod 
h1:+hnT3ywWDTAFrW5aE+u2Sa/wT555ZqwoCS+pk3p6ry4=
 github.com/dimfeld/httptreemux v5.0.1+incompatible 
h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
 github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod 
h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
 github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b 
h1:HBah4D48ypg3J7Np4N+HY/ZR76fx3HEUGxDU6Uk39oQ=
@@ -73,8 +70,6 @@ github.com/google/go-cmp v0.4.0/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
 github.com/google/gofuzz v1.0.0/go.mod 
h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
 github.com/google/uuid v1.1.2/go.mod 
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
-github.com/gorilla/context v1.1.1/go.mod 
h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
-github.com/gorilla/mux v1.7.3/go.mod 
h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
 github.com/gorilla/mux v1.7.4/go.mod 
h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
 github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c 
h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
 github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod 
h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
@@ -82,7 +77,6 @@ github.com/hpcloud/tail v1.0.0/go.mod 
h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
 github.com/inconshreveable/mousetrap v1.0.0 
h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod 
h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
 github.com/jawher/mow.cli v1.0.4/go.mod 
h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk=
-github.com/jawher/mow.cli v1.1.0/go.mod 
h1:aNaQlc7ozF3vw6IJ2dHjp2ZFiA4ozMIYY6PyuRJwlUg=
 github.com/jawher/mow.cli v1.2.0/go.mod 
h1:y+pcA3jBAdo/GIZx/0rFjw/K2bVEODP9rfZOfaiq8Ko=
 github.com/jmespath/go-jmespath v0.3.0/go.mod 
h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
 github.com/json-iterator/go v1.1.6/go.mod 
h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
@@ -169,8 +163,6 @@ github.com/stretchr/testify v1.4.0 
h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
 github.com/stretchr/testify v1.4.0/go.mod 
h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
 github.com/stretchr/testify v1.5.1 
h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
 github.com/stretchr/testify v1.5.1/go.mod 
h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
-github.com/stretchr/testify v1.7.0 
h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
-github.com/stretchr/testify v1.7.0/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
 github.com/yuin/goldmark v1.1.27/go.mod 
h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.2.1/go.mod 
h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
@@ -190,7 +182,6 @@ golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod 
h1:mL1N/T3taQHkDXs73r
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod 
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
-golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod 
h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
@@ -212,7 +203,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod 
h1:STP8DvDyc/dI5b8T5h
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -227,7 +217,6 @@ golang.org/x/text v0.3.2/go.mod 
h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod 
h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
-golang.org/x/tools v0.0.0-20190808195139-e713427fea3f/go.mod 
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod 
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
 golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod 
h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
 golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod 
h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
@@ -259,9 +248,5 @@ gopkg.in/yaml.v2 v2.2.1/go.mod 
h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
 gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod 
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
-gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b 
h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
-gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod 
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/pulsar/encryption.go b/pulsar/encryption.go
new file mode 100644
index 0000000..aade2ca
--- /dev/null
+++ b/pulsar/encryption.go
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import "github.com/apache/pulsar-client-go/pulsar/crypto"
+
+// ProducerEncryptionInfo encryption related fields required by the producer
+type ProducerEncryptionInfo struct {
+       // KeyReader read RSA public/private key pairs
+       KeyReader crypto.KeyReader
+
+       // MessageCrypto used to encrypt and decrypt the data and session keys
+       MessageCrypto crypto.MessageCrypto
+
+       // Keys list of encryption key names to encrypt session key
+       Keys []string
+
+       // ProducerCryptoFailureAction action to be taken on failure of message 
encryption
+       // default is ProducerCryptoFailureActionFail
+       ProducerCryptoFailureAction int
+}
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 3e1601f..92d6249 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -23,6 +23,7 @@ import (
        "github.com/gogo/protobuf/proto"
 
        "github.com/apache/pulsar-client-go/pulsar/internal/compression"
+       "github.com/apache/pulsar-client-go/pulsar/internal/crypto"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/apache/pulsar-client-go/pulsar/log"
 )
@@ -35,7 +36,7 @@ type BuffersPool interface {
 type BatcherBuilderProvider func(
        maxMessages uint, maxBatchSize uint, producerName string, producerID 
uint64,
        compressionType pb.CompressionType, level compression.Level,
-       bufferPool BuffersPool, logger log.Logger,
+       bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
 ) (BatchBuilder, error)
 
 // BatchBuilder is a interface of batch builders
@@ -51,12 +52,12 @@ type BatchBuilder interface {
        ) bool
 
        // Flush all the messages buffered in the client and wait until all 
messages have been successfully persisted.
-       Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{})
+       Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}, 
err error)
 
        // Flush all the messages buffered in multiple batches and wait until 
all
        // messages have been successfully persisted.
        FlushBatches() (
-               batchData []Buffer, sequenceID []uint64, callbacks 
[][]interface{},
+               batchData []Buffer, sequenceID []uint64, callbacks 
[][]interface{}, errors []error,
        )
 
        // Return the batch container batch message in multiple batches.
@@ -93,13 +94,15 @@ type batchContainer struct {
        buffersPool         BuffersPool
 
        log log.Logger
+
+       encryptor crypto.Encryptor
 }
 
 // newBatchContainer init a batchContainer
 func newBatchContainer(
        maxMessages uint, maxBatchSize uint, producerName string, producerID 
uint64,
        compressionType pb.CompressionType, level compression.Level,
-       bufferPool BuffersPool, logger log.Logger,
+       bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
 ) batchContainer {
 
        bc := batchContainer{
@@ -122,6 +125,7 @@ func newBatchContainer(
                compressionProvider: getCompressionProvider(compressionType, 
level),
                buffersPool:         bufferPool,
                log:                 logger,
+               encryptor:           encryptor,
        }
 
        if compressionType != pb.CompressionType_NONE {
@@ -135,12 +139,12 @@ func newBatchContainer(
 func NewBatchBuilder(
        maxMessages uint, maxBatchSize uint, producerName string, producerID 
uint64,
        compressionType pb.CompressionType, level compression.Level,
-       bufferPool BuffersPool, logger log.Logger,
+       bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
 ) (BatchBuilder, error) {
 
        bc := newBatchContainer(
                maxMessages, maxBatchSize, producerName, producerID, 
compressionType,
-               level, bufferPool, logger,
+               level, bufferPool, logger, encryptor,
        )
 
        return &bc, nil
@@ -211,11 +215,11 @@ func (bc *batchContainer) reset() {
 
 // Flush all the messages buffered in the client and wait until all messages 
have been successfully persisted.
 func (bc *batchContainer) Flush() (
-       batchData Buffer, sequenceID uint64, callbacks []interface{},
+       batchData Buffer, sequenceID uint64, callbacks []interface{}, err error,
 ) {
        if bc.numMessages == 0 {
                // No-Op for empty batch
-               return nil, 0, nil
+               return nil, 0, nil, nil
        }
        bc.log.Debug("BatchBuilder flush: messages: ", bc.numMessages)
 
@@ -229,19 +233,21 @@ func (bc *batchContainer) Flush() (
        if buffer == nil {
                buffer = NewBuffer(int(uncompressedSize * 3 / 2))
        }
-       serializeBatch(
-               buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, 
bc.compressionProvider,
-       )
+
+       if err = serializeBatch(
+               buffer, bc.cmdSend, bc.msgMetadata, bc.buffer, 
bc.compressionProvider, bc.encryptor,
+       ); err == nil { // no error in serializing Batch
+               sequenceID = bc.cmdSend.Send.GetSequenceId()
+       }
 
        callbacks = bc.callbacks
-       sequenceID = bc.cmdSend.Send.GetSequenceId()
        bc.reset()
-       return buffer, sequenceID, callbacks
+       return buffer, sequenceID, callbacks, err
 }
 
 // FlushBatches only for multiple batches container
 func (bc *batchContainer) FlushBatches() (
-       batchData []Buffer, sequenceID []uint64, callbacks [][]interface{},
+       batchData []Buffer, sequenceID []uint64, callbacks [][]interface{}, 
errors []error,
 ) {
        panic("single batch container not support FlushBatches(), please use 
Flush() instead")
 }
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index af6bac5..b91c0b6 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -24,6 +24,7 @@ import (
        "github.com/gogo/protobuf/proto"
 
        "github.com/apache/pulsar-client-go/pulsar/internal/compression"
+       "github.com/apache/pulsar-client-go/pulsar/internal/crypto"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
 )
 
@@ -221,9 +222,21 @@ func serializeBatch(wb Buffer,
        cmdSend *pb.BaseCommand,
        msgMetadata *pb.MessageMetadata,
        uncompressedPayload Buffer,
-       compressionProvider compression.Provider) {
+       compressionProvider compression.Provider,
+       encryptor crypto.Encryptor) error {
        // Wire format
        // [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] 
[METADATA_SIZE][METADATA] [PAYLOAD]
+
+       // compress the payload
+       compressedPayload := compressionProvider.Compress(nil, 
uncompressedPayload.ReadableSlice())
+
+       // encrypt the compressed payload
+       encryptedPayload, err := encryptor.Encrypt(compressedPayload, 
msgMetadata)
+       if err != nil {
+               // error occurred while encrypting the payload, 
ProducerCryptoFailureAction is set to Fail
+               return fmt.Errorf("encryption of message failed, 
ProducerCryptoFailureAction is set to Fail. Error :%v", err)
+       }
+
        cmdSize := uint32(proto.Size(cmdSend))
        msgMetadataSize := uint32(proto.Size(msgMetadata))
 
@@ -234,7 +247,7 @@ func serializeBatch(wb Buffer,
        // Write cmd
        wb.WriteUint32(cmdSize)
        wb.ResizeIfNeeded(cmdSize)
-       _, err := cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize])
+       _, err = cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize])
        if err != nil {
                panic(fmt.Sprintf("Protobuf error when serializing cmdSend: 
%v", err))
        }
@@ -255,12 +268,8 @@ func serializeBatch(wb Buffer,
        }
        wb.WrittenBytes(msgMetadataSize)
 
-       // Make sure the buffer has enough space to hold the compressed data
-       // and perform the compression in-place
-       maxSize := 
uint32(compressionProvider.CompressMaxSize(int(uncompressedPayload.ReadableBytes())))
-       wb.ResizeIfNeeded(maxSize)
-       b := compressionProvider.Compress(wb.WritableSlice()[:0], 
uncompressedPayload.ReadableSlice())
-       wb.WrittenBytes(uint32(len(b)))
+       // add payload to the buffer
+       wb.Write(encryptedPayload)
 
        // Write checksum at created checksum-placeholder
        frameEndIdx := wb.WriterIndex()
@@ -269,6 +278,7 @@ func serializeBatch(wb Buffer,
        // Set Sizes and checksum in the fixed-size header
        wb.PutUint32(frameEndIdx-frameStartIdx, frameSizeIdx) // External frame
        wb.PutUint32(checksum, checksumIdx)
+       return nil
 }
 
 // ConvertFromStringMap convert a string map to a KeyValue []byte
diff --git a/pulsar/internal/crypto/encryptor.go 
b/pulsar/internal/crypto/encryptor.go
new file mode 100644
index 0000000..7fdbf06
--- /dev/null
+++ b/pulsar/internal/crypto/encryptor.go
@@ -0,0 +1,27 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+)
+
+// Encryptor support encryption
+type Encryptor interface {
+       Encrypt([]byte, *pb.MessageMetadata) ([]byte, error)
+}
diff --git a/pulsar/internal/crypto/noop_encryptor.go 
b/pulsar/internal/crypto/noop_encryptor.go
new file mode 100644
index 0000000..4512e7b
--- /dev/null
+++ b/pulsar/internal/crypto/noop_encryptor.go
@@ -0,0 +1,33 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+)
+
+type noopEncryptor struct{}
+
+func NewNoopEncryptor() Encryptor {
+       return &noopEncryptor{}
+}
+
+// Encrypt Noop ecryptor
+func (e *noopEncryptor) Encrypt(data []byte, msgMetadata *pb.MessageMetadata) 
([]byte, error) {
+       return data, nil
+}
diff --git a/pulsar/internal/crypto/producer_encryptor.go 
b/pulsar/internal/crypto/producer_encryptor.go
new file mode 100644
index 0000000..a5b972d
--- /dev/null
+++ b/pulsar/internal/crypto/producer_encryptor.go
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+       "fmt"
+
+       "github.com/apache/pulsar-client-go/pulsar/crypto"
+       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+       "github.com/apache/pulsar-client-go/pulsar/log"
+)
+
+type producerEncryptor struct {
+       keys                        []string
+       keyReader                   crypto.KeyReader
+       messageCrypto               crypto.MessageCrypto
+       logger                      log.Logger
+       producerCryptoFailureAction int
+}
+
+func NewProducerEncryptor(keys []string,
+       keyReader crypto.KeyReader,
+       messageCrypto crypto.MessageCrypto,
+       producerCryptoFailureAction int,
+       logger log.Logger) Encryptor {
+       return &producerEncryptor{
+               keys:                        keys,
+               keyReader:                   keyReader,
+               messageCrypto:               messageCrypto,
+               logger:                      logger,
+               producerCryptoFailureAction: producerCryptoFailureAction,
+       }
+}
+
+// Encrypt producer encryptor
+func (e *producerEncryptor) Encrypt(payload []byte, msgMetadata 
*pb.MessageMetadata) ([]byte, error) {
+       // encrypt payload
+       encryptedPayload, err := e.messageCrypto.Encrypt(e.keys,
+               e.keyReader,
+               crypto.NewMessageMetadataSupplier(msgMetadata),
+               payload)
+
+       // error encryping the payload
+       if err != nil {
+               // error occurred in encrypting the payload
+               // crypto ProducerCryptoFailureAction is set to send
+               // send unencrypted message
+               if e.producerCryptoFailureAction == 
crypto.ProducerCryptoFailureActionSend {
+                       e.logger.
+                               WithError(err).
+                               Warnf("Encryption failed for payload sending 
unencrypted message ProducerCryptoFailureAction is set to send")
+                       return payload, nil
+               }
+
+               return nil, fmt.Errorf("ProducerCryptoFailureAction is set to 
Fail and error occurred in encrypting payload :%v", err)
+       }
+       return encryptedPayload, nil
+}
diff --git a/pulsar/internal/key_based_batch_builder.go 
b/pulsar/internal/key_based_batch_builder.go
index 545c2c8..940aa9f 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -24,6 +24,7 @@ import (
        "time"
 
        "github.com/apache/pulsar-client-go/pulsar/internal/compression"
+       "github.com/apache/pulsar-client-go/pulsar/internal/crypto"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/apache/pulsar-client-go/pulsar/log"
 )
@@ -85,14 +86,14 @@ func (h *keyBasedBatches) Val(key string) *batchContainer {
 func NewKeyBasedBatchBuilder(
        maxMessages uint, maxBatchSize uint, producerName string, producerID 
uint64,
        compressionType pb.CompressionType, level compression.Level,
-       bufferPool BuffersPool, logger log.Logger,
+       bufferPool BuffersPool, logger log.Logger, encryptor crypto.Encryptor,
 ) (BatchBuilder, error) {
 
        bb := &keyBasedBatchContainer{
                batches: newKeyBasedBatches(),
                batchContainer: newBatchContainer(
                        maxMessages, maxBatchSize, producerName, producerID,
-                       compressionType, level, bufferPool, logger,
+                       compressionType, level, bufferPool, logger, encryptor,
                ),
                compressionType: compressionType,
                level:           level,
@@ -144,7 +145,7 @@ func (bc *keyBasedBatchContainer) Add(
                // create batchContainer for new key
                t := newBatchContainer(
                        bc.maxMessages, bc.maxBatchSize, bc.producerName, 
bc.producerID,
-                       bc.compressionType, bc.level, bc.buffersPool, bc.log,
+                       bc.compressionType, bc.level, bc.buffersPool, bc.log, 
bc.encryptor,
                )
                batchPart = &t
                bc.batches.Add(msgKey, &t)
@@ -179,11 +180,11 @@ func (bc *keyBasedBatchContainer) reset() {
 // Flush all the messages buffered in multiple batches and wait until all
 // messages have been successfully persisted.
 func (bc *keyBasedBatchContainer) FlushBatches() (
-       batchesData []Buffer, sequenceIDs []uint64, callbacks [][]interface{},
+       batchesData []Buffer, sequenceIDs []uint64, callbacks [][]interface{}, 
errors []error,
 ) {
        if bc.numMessages == 0 {
                // No-Op for empty batch
-               return nil, nil, nil
+               return nil, nil, nil, nil
        }
 
        bc.log.Debug("keyBasedBatchContainer flush: messages: ", bc.numMessages)
@@ -194,6 +195,7 @@ func (bc *keyBasedBatchContainer) FlushBatches() (
        batchesData = make([]Buffer, batchesLen)
        sequenceIDs = make([]uint64, batchesLen)
        callbacks = make([][]interface{}, batchesLen)
+       errors = make([]error, batchesLen)
 
        bc.batches.l.RLock()
        defer bc.batches.l.RUnlock()
@@ -203,21 +205,22 @@ func (bc *keyBasedBatchContainer) FlushBatches() (
        sort.Strings(sortedKeys)
        for _, k := range sortedKeys {
                container := bc.batches.containers[k]
-               b, s, c := container.Flush()
+               b, s, c, err := container.Flush()
                if b != nil {
                        batchesData[idx] = b
                        sequenceIDs[idx] = s
                        callbacks[idx] = c
+                       errors[idx] = err
                }
                idx++
        }
 
        bc.reset()
-       return batchesData, sequenceIDs, callbacks
+       return batchesData, sequenceIDs, callbacks, errors
 }
 
 func (bc *keyBasedBatchContainer) Flush() (
-       batchData Buffer, sequenceID uint64, callbacks []interface{},
+       batchData Buffer, sequenceID uint64, callbacks []interface{}, err error,
 ) {
        panic("multi batches container not support Flush(), please use 
FlushBatches() instead")
 }
diff --git a/pulsar/producer.go b/pulsar/producer.go
index ffbdebb..07a8f75 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -163,6 +163,9 @@ type ProducerOptions struct {
        // PartitionsAutoDiscoveryInterval is the time interval for the 
background process to discover new partitions
        // Default is 1 minute
        PartitionsAutoDiscoveryInterval time.Duration
+
+       // Encryption necessary fields to perform encryption of message
+       Encryption *ProducerEncryptionInfo
 }
 
 // Producer is used to publish messages on a topic
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index ca6850d..4ae4e00 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -19,11 +19,14 @@ package pulsar
 
 import (
        "context"
+       "fmt"
        "sync"
        "sync/atomic"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/crypto"
        "github.com/apache/pulsar-client-go/pulsar/internal/compression"
+       internalcrypto 
"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
 
        "github.com/gogo/protobuf/proto"
 
@@ -130,6 +133,24 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
                p.producerName = options.Name
        }
 
+       encryption := options.Encryption
+       // add default message crypto if not provided
+       if encryption != nil && len(encryption.Keys) > 0 {
+               if encryption.KeyReader == nil {
+                       return nil, fmt.Errorf("encryption is enabled, 
KeyReader can not be nil")
+               }
+
+               if encryption.MessageCrypto == nil {
+                       logCtx := fmt.Sprintf("[%v] [%v] [%v]", p.topic, 
p.producerName, p.producerID)
+                       messageCrypto, err := 
crypto.NewDefaultMessageCrypto(logCtx, true, logger)
+                       if err != nil {
+                               logger.WithError(err).Error("Unable to get 
MessageCrypto instance. Producer creation is abandoned")
+                               return nil, err
+                       }
+                       p.options.Encryption.MessageCrypto = messageCrypto
+               }
+       }
+
        err := p.grabCnx()
        if err != nil {
                logger.WithError(err).Error("Failed to create producer")
@@ -147,6 +168,7 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
        if p.options.SendTimeout > 0 {
                go p.failTimeoutMessages()
        }
+
        go p.runEventsLoop()
 
        return p, nil
@@ -205,13 +227,25 @@ func (p *partitionProducer) grabCnx() error {
        }
 
        p.producerName = res.Response.ProducerSuccess.GetProducerName()
+
+       var encryptor internalcrypto.Encryptor
+       if p.options.Encryption != nil {
+               encryptor = 
internalcrypto.NewProducerEncryptor(p.options.Encryption.Keys,
+                       p.options.Encryption.KeyReader,
+                       p.options.Encryption.MessageCrypto,
+                       p.options.Encryption.ProducerCryptoFailureAction, p.log)
+       } else {
+               encryptor = internalcrypto.NewNoopEncryptor()
+       }
+
        if p.options.DisableBatching {
                provider, _ := GetBatcherBuilderProvider(DefaultBatchBuilder)
                p.batchBuilder, err = provider(p.options.BatchingMaxMessages, 
p.options.BatchingMaxSize,
                        p.producerName, p.producerID, 
pb.CompressionType(p.options.CompressionType),
                        compression.Level(p.options.CompressionLevel),
                        p,
-                       p.log)
+                       p.log,
+                       encryptor)
                if err != nil {
                        return err
                }
@@ -225,7 +259,8 @@ func (p *partitionProducer) grabCnx() error {
                        p.producerName, p.producerID, 
pb.CompressionType(p.options.CompressionType),
                        compression.Level(p.options.CompressionLevel),
                        p,
-                       p.log)
+                       p.log,
+                       encryptor)
                if err != nil {
                        return err
                }
@@ -470,11 +505,22 @@ type pendingItem struct {
 }
 
 func (p *partitionProducer) internalFlushCurrentBatch() {
-       batchData, sequenceID, callbacks := p.batchBuilder.Flush()
+       batchData, sequenceID, callbacks, err := p.batchBuilder.Flush()
        if batchData == nil {
                return
        }
 
+       // error occurred in batch flush
+       // report it using callback
+       if err != nil {
+               for _, cb := range callbacks {
+                       if sr, ok := cb.(*sendRequest); ok {
+                               sr.callback(nil, sr.msg, err)
+                       }
+               }
+               return
+       }
+
        p.pendingQueue.Put(&pendingItem{
                sentAt:       time.Now(),
                batchData:    batchData,
@@ -589,12 +635,22 @@ func (p *partitionProducer) failTimeoutMessages() {
 }
 
 func (p *partitionProducer) internalFlushCurrentBatches() {
-       batchesData, sequenceIDs, callbacks := p.batchBuilder.FlushBatches()
+       batchesData, sequenceIDs, callbacks, errors := 
p.batchBuilder.FlushBatches()
        if batchesData == nil {
                return
        }
 
        for i := range batchesData {
+               // error occurred in processing batch
+               // report it using callback
+               if errors[i] != nil {
+                       for _, cb := range callbacks[i] {
+                               if sr, ok := cb.(*sendRequest); ok {
+                                       sr.callback(nil, sr.msg, errors[i])
+                               }
+                       }
+                       continue
+               }
                if batchesData[i] == nil {
                        continue
                }
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index dc7a5ef..f914017 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -30,6 +30,8 @@ import (
        "github.com/apache/pulsar-client-go/pulsar/internal"
        "github.com/stretchr/testify/assert"
 
+       "github.com/apache/pulsar-client-go/pulsar/crypto"
+       plog "github.com/apache/pulsar-client-go/pulsar/log"
        log "github.com/sirupsen/logrus"
 )
 
@@ -996,6 +998,113 @@ func TestSendContextExpired(t *testing.T) {
        makeHTTPCall(t, http.MethodDelete, quotaURL, "")
 }
 
+func TestProducerWithRSAEncryption(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       ctx := context.Background()
+
+       msgCrypto, err := crypto.NewDefaultMessageCrypto("testing", true, 
plog.DefaultNopLogger())
+       assert.Nil(t, err)
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+               Encryption: &ProducerEncryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+                       MessageCrypto: msgCrypto,
+                       Keys:          []string{"my-app.key"},
+               },
+               Schema: NewStringSchema(nil),
+       })
+
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               if _, err := producer.Send(ctx, &ProducerMessage{
+                       Value: fmt.Sprintf("hello-%d", i),
+               }); err != nil {
+                       log.Fatal(err)
+               }
+       }
+}
+
+func TestProducuerCreationFailOnNilKeyReader(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+
+       msgCrypto, err := crypto.NewDefaultMessageCrypto("testing", true, 
plog.DefaultNopLogger())
+       assert.Nil(t, err)
+
+       // create producer
+       // Producer creation should fail as keyreader is nil
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+               Encryption: &ProducerEncryptionInfo{
+                       MessageCrypto: msgCrypto,
+                       Keys:          []string{"my-app.key"},
+               },
+               Schema: NewStringSchema(nil),
+       })
+
+       assert.NotNil(t, err)
+       assert.Nil(t, producer)
+}
+
+func TestProducuerSendFailOnInvalidKey(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+
+       msgCrypto, err := crypto.NewDefaultMessageCrypto("testing", true, 
plog.DefaultNopLogger())
+       assert.Nil(t, err)
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: false,
+               Encryption: &ProducerEncryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/invalid_pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+                       MessageCrypto: msgCrypto,
+                       Keys:          []string{"my-app.key"},
+               },
+               Schema: NewStringSchema(nil),
+       })
+
+       assert.Nil(t, err)
+       assert.NotNil(t, producer)
+
+       // producer should send return an error as keyreader is configured with 
wrong pub.key and fail while encrypting message
+       mid, err := producer.Send(context.Background(), &ProducerMessage{
+               Value: "test",
+       })
+
+       assert.NotNil(t, err)
+       assert.Nil(t, mid)
+}
+
 type noopProduceInterceptor struct{}
 
 func (noopProduceInterceptor) BeforeSend(producer Producer, message 
*ProducerMessage) {}

Reply via email to