GPrabhudas commented on a change in pull request #560:
URL: https://github.com/apache/pulsar-client-go/pull/560#discussion_r673202478



##########
File path: pulsar/internal/batch_builder.go
##########
@@ -128,24 +137,56 @@ func newBatchContainer(
                bc.msgMetadata.Compression = &compressionType
        }
 
+       for _, opt := range options {
+               opt(&bc)
+       }
+
        return bc
 }
 
 // NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a 
new batch message container.
 func NewBatchBuilder(
        maxMessages uint, maxBatchSize uint, producerName string, producerID 
uint64,
        compressionType pb.CompressionType, level compression.Level,
-       bufferPool BuffersPool, logger log.Logger,
+       bufferPool BuffersPool, logger log.Logger, options 
...func(*batchContainer),
 ) (BatchBuilder, error) {
 
        bc := newBatchContainer(
                maxMessages, maxBatchSize, producerName, producerID, 
compressionType,
-               level, bufferPool, logger,
+               level, bufferPool, logger, options...,
        )
 
        return &bc, nil
 }
 
+// UseEncryptionKeys encryption key names to use

Review comment:
       Done

##########
File path: pulsar/producer_partition.go
##########
@@ -197,30 +248,32 @@ func (p *partitionProducer) grabCnx() error {
        }
 
        p.producerName = res.Response.ProducerSuccess.GetProducerName()
+
+       var provider internal.BatcherBuilderProvider
        if p.options.DisableBatching {
-               provider, _ := GetBatcherBuilderProvider(DefaultBatchBuilder)
-               p.batchBuilder, err = provider(p.options.BatchingMaxMessages, 
p.options.BatchingMaxSize,
-                       p.producerName, p.producerID, 
pb.CompressionType(p.options.CompressionType),
-                       compression.Level(p.options.CompressionLevel),
-                       p,
-                       p.log)
-               if err != nil {
-                       return err
-               }
+               provider, _ = GetBatcherBuilderProvider(DefaultBatchBuilder)
+
        } else if p.batchBuilder == nil {
-               provider, err := 
GetBatcherBuilderProvider(p.options.BatcherBuilderType)

Review comment:
       reverted

##########
File path: pulsar/producer_partition.go
##########
@@ -148,6 +179,26 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
        return p, nil
 }
 
+func (p *partitionProducer) sheduleDataKeyUpdate() {
+       for t := range p.dataKeyTicker.C {
+               p.log.Infof("Refreshing data key :%v", t)
+               err := p.updateDataKey()
+               if err != nil {
+                       p.log.Errorf("Error refreshing data key : %v", err)
+               }
+       }
+}
+
+func (p *partitionProducer) updateDataKey() error {

Review comment:
       done

##########
File path: pulsar/internal/commands.go
##########
@@ -269,6 +270,117 @@ func serializeBatch(wb Buffer,
        wb.PutUint32(checksum, checksumIdx)
 }
 
+// copy of the method serializeBatch(....) with an extension to encrypt payload
+func serializeBatchWithEncryption(wb Buffer,
+       cmdSend *pb.BaseCommand,
+       msgMetadata *pb.MessageMetadata,
+       uncompressedPayload Buffer,
+       compressionProvider compression.Provider,
+       KeyReader crypto.KeyReader,
+       encryptionKeys []string,
+       msgCrypto crypto.MessageCrypto,
+       cryptoFailureAction int,
+) {
+       // Wire format
+       // [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] 
[METADATA_SIZE][METADATA] [PAYLOAD]
+
+       // compress the payload
+       compressedPayload := compressionProvider.Compress(nil, 
uncompressedPayload.ReadableSlice())
+
+       encryptedPayload := encryptPayload(msgMetadata,
+               msgCrypto,
+               KeyReader,
+               encryptionKeys,
+               compressedPayload,
+               cryptoFailureAction)
+
+       // there was a error in encrypting the payload and
+       // crypto failure action is set to 
crypto.ProducerCryptoFailureActionFail
+       if encryptedPayload == nil {
+               panic(fmt.Errorf("error in encrypting the payload and message 
is not sent"))
+       }
+
+       compressedPayload = encryptedPayload
+
+       cmdSize := uint32(proto.Size(cmdSend))
+       msgMetadataSize := uint32(proto.Size(msgMetadata))
+
+       frameSizeIdx := wb.WriterIndex()
+       wb.WriteUint32(0) // Skip frame size until we now the size
+       frameStartIdx := wb.WriterIndex()
+
+       // Write cmd
+       wb.WriteUint32(cmdSize)
+       wb.ResizeIfNeeded(cmdSize)
+       _, err := cmdSend.MarshalToSizedBuffer(wb.WritableSlice()[:cmdSize])
+       if err != nil {
+               panic(fmt.Sprintf("Protobuf error when serializing cmdSend: 
%v", err))
+       }
+       wb.WrittenBytes(cmdSize)
+
+       // Create checksum placeholder
+       wb.WriteUint16(magicCrc32c)
+       checksumIdx := wb.WriterIndex()
+       wb.WriteUint32(0) // skip 4 bytes of checksum
+
+       // Write metadata
+       metadataStartIdx := wb.WriterIndex()
+       wb.WriteUint32(msgMetadataSize)
+       wb.ResizeIfNeeded(msgMetadataSize)
+       _, err = 
msgMetadata.MarshalToSizedBuffer(wb.WritableSlice()[:msgMetadataSize])
+       if err != nil {
+               panic(fmt.Sprintf("Protobuf error when serializing msgMetadata: 
%v", err))
+       }
+       wb.WrittenBytes(msgMetadataSize)
+
+       wb.Write(compressedPayload)
+
+       // Write checksum at created checksum-placeholder
+       frameEndIdx := wb.WriterIndex()
+       checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, 
frameEndIdx-metadataStartIdx))
+
+       // Set Sizes and checksum in the fixed-size header
+       wb.PutUint32(frameEndIdx-frameStartIdx, frameSizeIdx) // External frame
+       wb.PutUint32(checksum, checksumIdx)
+}
+
+func encryptPayload(msgMetadata *pb.MessageMetadata,
+       msgCrypto crypto.MessageCrypto,
+       KeyReader crypto.KeyReader,
+       encryptionKeys []string,
+       compressedPayload []byte,
+       cryptoFailureAction int,
+) []byte {
+
+       // encryption is enabled but KeyReader interface is not implemented
+       if KeyReader == nil {
+               // crypto failure action is set to send
+               // so send unencrypted message
+               if cryptoFailureAction == 
crypto.ProducerCryptoFailureActionSend {
+                       return compressedPayload
+               }
+               return nil
+       }
+
+       // encrypt payload
+       encryptedPayload, err := msgCrypto.Encrypt(encryptionKeys,
+               KeyReader,
+               crypto.NewMessageMetadataSupplier(msgMetadata),
+               compressedPayload)
+
+       if err != nil {

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to