This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b1cc78  Forward encryption properties with encrypted payload to 
consumer (#2024)
9b1cc78 is described below

commit 9b1cc78e3e3f9897f931d186afdf5e02dc399136
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Sun Jul 1 15:40:02 2018 -0700

    Forward encryption properties with encrypted payload to consumer (#2024)
    
    * Forward encryption properties with encrypted payload to consumer
    
    * add EncryptionCtx to message to store encryption metadata
---
 .../client/api/SimpleProducerConsumerTest.java     | 161 ++++++++++++++++++++-
 .../client/api/ConsumerCryptoFailureAction.java    |  19 ++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  65 ++++++++-
 .../EncryptionContext.java}                        |  41 ++++--
 .../apache/pulsar/client/impl/MessageCrypto.java   |   2 +-
 .../org/apache/pulsar/client/impl/MessageImpl.java |  16 +-
 .../apache/pulsar/client/impl/MessageParser.java   |   3 +-
 7 files changed, 280 insertions(+), 27 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index ab2af4c..7fb027d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -35,6 +35,7 @@ import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -54,10 +55,22 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import 
org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
 import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.EncryptionContext;
+import org.apache.pulsar.client.impl.EncryptionContext.EncryptionKey;
+import org.apache.pulsar.client.impl.MessageCrypto;
 import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarDecoder;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -67,8 +80,12 @@ import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 public class SimpleProducerConsumerTest extends ProducerConsumerBase {
     private static final Logger log = 
LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
 
@@ -2222,10 +2239,13 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
             producer2.send(message.getBytes());
         }
 
-        Message<byte[]> msg = null;
+        MessageImpl<byte[]> msg = null;
 
         for (int i = 0; i < totalMsg * 2; i++) {
-            msg = consumer.receive(5, TimeUnit.SECONDS);
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new 
IllegalStateException("encryption-ctx not present for encrypted message"));
             String receivedMessage = new String(msg.getData());
             log.debug("Received message: [{}]", receivedMessage);
             String expectedMessage = "my-message-" + i;
@@ -2276,7 +2296,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 
         final int totalMsg = 10;
 
-        Message<byte[]> msg = null;
+        MessageImpl<byte[]> msg = null;
         Set<String> messageSet = Sets.newHashSet();
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
                 
.topic("persistent://my-property/use/myenc-ns/myenc-topic1").subscriptionName("my-subscriber-name")
@@ -2307,7 +2327,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 
         // 3. KeyReder is not set by consumer
         // Receive should fail since key reader is not setup
-        msg = consumer.receive(5, TimeUnit.SECONDS);
+        msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
         Assert.assertNull(msg, "Receive should have failed with no keyreader");
 
         // 4. Set consumer config to consume even if decryption fails
@@ -2319,7 +2339,7 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
         int msgNum = 0;
         try {
             // Receive should proceed and deliver encrypted message
-            msg = consumer.receive(5, TimeUnit.SECONDS);
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
             String receivedMessage = new String(msg.getData());
             String expectedMessage = "my-message-" + msgNum++;
             Assert.assertNotEquals(receivedMessage, expectedMessage, "Received 
encrypted message " + receivedMessage
@@ -2338,7 +2358,10 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
                 .cryptoKeyReader(new 
EncKeyReader()).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
 
         for (int i = msgNum; i < totalMsg - 1; i++) {
-            msg = consumer.receive(5, TimeUnit.SECONDS);
+            msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+            // verify that encrypted message contains encryption-context
+            msg.getEncryptionCtx()
+                    .orElseThrow(() -> new 
IllegalStateException("encryption-ctx not present for encrypted message"));
             String receivedMessage = new String(msg.getData());
             log.debug("Received message: [{}]", receivedMessage);
             String expectedMessage = "my-message-" + i;
@@ -2355,12 +2378,136 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
                 .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
 
         // Receive should proceed and discard encrypted messages
-        msg = consumer.receive(5, TimeUnit.SECONDS);
+        msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
         Assert.assertNull(msg, "Message received even aftet 
ConsumerCryptoFailureAction.DISCARD is set.");
 
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test(groups = "encryption")
+    public void testEncryptionConsumerWithoutCryptoReader() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String encryptionKeyName = "client-rsa.pem";
+        final String encryptionKeyVersion = "1.0";
+        Map<String, String> metadata = Maps.newHashMap();
+        metadata.put("version", encryptionKeyVersion);
+        class EncKeyReader implements CryptoKeyReader {
+            EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+            @Override
+            public EncryptionKeyInfo getPublicKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/public-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+
+            @Override
+            public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, 
String> keyMeta) {
+                String CERT_FILE_PATH = 
"./src/test/resources/certificate/private-key." + keyName;
+                if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+                    try {
+                        
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+                        keyInfo.setMetadata(metadata);
+                        return keyInfo;
+                    } catch (IOException e) {
+                        Assert.fail("Failed to read certificate from " + 
CERT_FILE_PATH);
+                    }
+                } else {
+                    Assert.fail("Certificate file " + CERT_FILE_PATH + " is 
not present or not readable.");
+                }
+                return null;
+            }
+        }
+
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                
.subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+                .subscribe();
+
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+                
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+                .cryptoKeyReader(new EncKeyReader()).create();
+
+        String message = "my-message";
+        producer.send(message.getBytes());
+
+        MessageImpl<byte[]> msg = (MessageImpl<byte[]>) consumer.receive(5, 
TimeUnit.SECONDS);
+
+        String receivedMessage = decryptMessage(msg, encryptionKeyName, new 
EncKeyReader());
+        assertEquals(message, receivedMessage);
+
+        consumer.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
+    
+    private String decryptMessage(MessageImpl<byte[]> msg, String 
encryptionKeyName, CryptoKeyReader reader)
+            throws Exception {
+        Optional<EncryptionContext> ctx = msg.getEncryptionCtx();
+        Assert.assertTrue(ctx.isPresent());
+        EncryptionContext encryptionCtx = ctx
+                .orElseThrow(() -> new IllegalStateException("encryption-ctx 
not present for encrypted message"));
+
+        Map<String, EncryptionKey> keys = encryptionCtx.getKeys();
+        assertEquals(keys.size(), 1);
+        EncryptionKey encryptionKey = keys.get(encryptionKeyName);
+        byte[] dataKey = encryptionKey.getKeyValue();
+        Map<String, String> metadata = encryptionKey.getMetadata();
+        String version = metadata.get("version");
+        assertEquals(version, "1.0");
+
+        org.apache.pulsar.common.api.proto.PulsarApi.CompressionType 
compressionType = encryptionCtx
+                .getCompressionType();
+        int uncompressedSize = encryptionCtx.getUncompressedMessageSize();
+        byte[] encrParam = encryptionCtx.getParam();
+        String encAlgo = encryptionCtx.getAlgorithm();
+        int batchSize = encryptionCtx.getBatchSize().orElse(0);
+
+        ByteBuf payloadBuf = Unpooled.wrappedBuffer(msg.getData());
+        // try to decrypt
+        MessageCrypto crypto = new MessageCrypto("test", false);
+        Builder metadataBuilder = MessageMetadata.newBuilder();
+        org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys.Builder 
encKeyBuilder = EncryptionKeys.newBuilder();
+        encKeyBuilder.setKey(encryptionKeyName);
+        ByteString keyValue = ByteString.copyFrom(dataKey);
+        encKeyBuilder.setValue(keyValue);
+        EncryptionKeys encKey = encKeyBuilder.build();
+        metadataBuilder.setEncryptionParam(ByteString.copyFrom(encrParam));
+        metadataBuilder.setEncryptionAlgo(encAlgo);
+        metadataBuilder.setProducerName("test");
+        metadataBuilder.setSequenceId(123);
+        metadataBuilder.setPublishTime(12333453454L);
+        metadataBuilder.addEncryptionKeys(encKey);
+        metadataBuilder.setCompression(compressionType);
+        metadataBuilder.setUncompressedSize(uncompressedSize);
+        ByteBuf decryptedPayload = crypto.decrypt(metadataBuilder.build(), 
payloadBuf, reader);
+
+        // try to uncompress
+        CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(compressionType);
+        ByteBuf uncompressedPayload = codec.decode(decryptedPayload, 
uncompressedSize);
+
+        if (batchSize > 0) {
+            PulsarApi.SingleMessageMetadata.Builder 
singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
+                    .newBuilder();
+            uncompressedPayload = 
Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
+                    singleMessageMetadataBuilder, 0, batchSize);
+        }
+
+        byte[] data = new byte[uncompressedPayload.readableBytes()];
+        uncompressedPayload.readBytes(data);
+        uncompressedPayload.release();
+        return new String(data);
+    }
+
     @Test
     public void testConsumerSubscriptionInitialize() throws Exception {
         log.info("-- Starting {} test --", methodName);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
index b67383a..e71b798 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
@@ -19,11 +19,22 @@
 
 package org.apache.pulsar.client.api;
 
+import org.apache.pulsar.client.impl.EncryptionContext;
+
 public enum ConsumerCryptoFailureAction {
     FAIL, // This is the default option to fail consume until crypto succeeds
     DISCARD, // Message is silently acknowledged and not delivered to the 
application
-    CONSUME // Deliver the encrypted message to the application. It's the 
application's
-            // responsibility to decrypt the message. If message is also 
compressed,
-            // decompression will fail. If message contain batch messages, 
client will
-            // not be able to retrieve individual messages in the batch
+    /**
+     * 
+     * <pre>
+     * Deliver the encrypted message to the application. It's the 
application's responsibility to decrypt the message.
+     * If message is also compressed, decompression will fail. If message 
contain batch messages, client will not be
+     * able to retrieve individual messages in the batch.
+     * </pre>
+     * 
+     * Delivered encrypted message contains {@link EncryptionContext} which 
contains encryption and compression
+     * information in it using which application can decrypt consumed message 
payload.
+     * 
+     */
+    CONSUME;
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index d94fe8b..e7924cb 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -27,17 +27,20 @@ import static 
org.apache.pulsar.common.api.Commands.readChecksum;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Timeout;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import static java.util.Base64.getEncoder;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -50,6 +53,7 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
+
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.ConsumerStats;
@@ -59,6 +63,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.PulsarDecoder;
@@ -67,6 +72,8 @@ import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
+import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
@@ -76,6 +83,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 
 public class ConsumerImpl<T> extends ConsumerBase<T> implements 
ConnectionHandler.Connection {
     private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
@@ -709,11 +717,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         }
 
         ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, 
msgMetadata, payload, cnx);
+        
+        boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata);
+        
         if (decryptedPayload == null) {
             // Message was discarded or CryptoKeyReader isn't implemented
             return;
         }
-        ByteBuf uncompressedPayload = uncompressPayloadIfNeeded(messageId, 
msgMetadata, decryptedPayload, cnx);
+        
+        // uncompress decryptedPayload and release decryptedPayload-ByteBuf
+        ByteBuf uncompressedPayload = isMessageUndecryptable ? 
decryptedPayload.retain() 
+                : uncompressPayloadIfNeeded(messageId, msgMetadata, 
decryptedPayload, cnx);
         decryptedPayload.release();
         if (uncompressedPayload == null) {
             // Message was discarded on decompression error
@@ -722,8 +736,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
 
         final int numMessages = msgMetadata.getNumMessagesInBatch();
 
-        if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
-            final MessageImpl<T> message = new MessageImpl<>(msgId, 
msgMetadata, uncompressedPayload, cnx, schema);
+        // if message is not decryptable then it can't be parsed as a 
batch-message. so, add EncyrptionCtx to message
+        // and return undecrypted payload
+        if (isMessageUndecryptable || (numMessages == 1 && 
!msgMetadata.hasNumMessagesInBatch())) {
+            final MessageImpl<T> message = new MessageImpl<>(msgId, 
msgMetadata, uncompressedPayload,
+                    createEncryptionContext(msgMetadata), cnx, schema);
             uncompressedPayload.release();
             msgMetadata.recycle();
 
@@ -895,7 +912,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
                 BatchMessageIdImpl batchMessageIdImpl = new 
BatchMessageIdImpl(messageId.getLedgerId(),
                         messageId.getEntryId(), getPartitionIndex(), i, acker);
                 final MessageImpl<T> message = new 
MessageImpl<>(batchMessageIdImpl, msgMetadata,
-                        singleMessageMetadataBuilder.build(), 
singleMessagePayload, cnx, schema);
+                        singleMessageMetadataBuilder.build(), 
singleMessagePayload,
+                        createEncryptionContext(msgMetadata), cnx, schema);
                 lock.readLock().lock();
                 try {
                     if (pendingReceives.isEmpty()) {
@@ -1322,6 +1340,45 @@ public class ConsumerImpl<T> extends ConsumerBase<T> 
implements ConnectionHandle
         return messageId;
     }
 
+
+    private boolean isMessageUndecryptable(MessageMetadata msgMetadata) {
+        return (msgMetadata.getEncryptionKeysCount() > 0 && 
conf.getCryptoKeyReader() == null
+                && conf.getCryptoFailureAction() == 
ConsumerCryptoFailureAction.CONSUME);
+    }
+
+    /**
+     * Create EncryptionContext if message payload is encrypted
+     * 
+     * @param msgMetadata
+     * @return {@link Optional}<{@link EncryptionContext}>
+     */
+    private Optional<EncryptionContext> 
createEncryptionContext(MessageMetadata msgMetadata) {
+
+        EncryptionContext encryptionCtx = null;
+        if (msgMetadata.getEncryptionKeysCount() > 0) {
+            encryptionCtx = new EncryptionContext();
+            Map<String, EncryptionKey> keys = 
msgMetadata.getEncryptionKeysList().stream()
+                    .collect(
+                            Collectors.toMap(EncryptionKeys::getKey,
+                                    e -> new 
EncryptionKey(e.getValue().toByteArray(),
+                                            e.getMetadataList() != null
+                                                    ? 
e.getMetadataList().stream().collect(
+                                                            
Collectors.toMap(KeyValue::getKey, KeyValue::getValue))
+                                                    : null)));
+            byte[] encParam = new byte[MessageCrypto.ivLen];
+            msgMetadata.getEncryptionParam().copyTo(encParam, 0);
+            Optional<Integer> batchSize = Optional
+                    .ofNullable(msgMetadata.hasNumMessagesInBatch() ? 
msgMetadata.getNumMessagesInBatch() : null);
+            encryptionCtx.setKeys(keys);
+            encryptionCtx.setParam(encParam);
+            encryptionCtx.setAlgorithm(msgMetadata.getEncryptionAlgo());
+            encryptionCtx.setCompressionType(msgMetadata.getCompression());
+            
encryptionCtx.setUncompressedMessageSize(msgMetadata.getUncompressedSize());
+            encryptionCtx.setBatchSize(batchSize);
+        }
+        return Optional.ofNullable(encryptionCtx);
+    }
+
     private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
         int messagesFromQueue = 0;
         Message<T> peek = incomingMessages.peek();
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
similarity index 50%
copy from 
pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
copy to 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
index b67383a..ba7018e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
@@ -16,14 +16,37 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.pulsar.client.impl;
 
-package org.apache.pulsar.client.api;
+import java.util.Map;
+import java.util.Optional;
 
-public enum ConsumerCryptoFailureAction {
-    FAIL, // This is the default option to fail consume until crypto succeeds
-    DISCARD, // Message is silently acknowledged and not delivered to the 
application
-    CONSUME // Deliver the encrypted message to the application. It's the 
application's
-            // responsibility to decrypt the message. If message is also 
compressed,
-            // decompression will fail. If message contain batch messages, 
client will
-            // not be able to retrieve individual messages in the batch
-}
+import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class EncryptionContext {
+
+    private Map<String, EncryptionKey> keys;
+    private byte[] param;
+    private Map<String, String> metadata;
+    private String algorithm;
+    private CompressionType compressionType;
+    private int uncompressedMessageSize;
+    private Optional<Integer> batchSize;
+
+    @Getter
+    @Setter
+    @NoArgsConstructor
+    @AllArgsConstructor
+    public static class EncryptionKey {
+        private byte[] keyValue;
+        private Map<String, String> metadata;
+    }
+
+}
\ No newline at end of file
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
index 2b57470..1164108 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
@@ -97,7 +97,7 @@ public class MessageCrypto {
 
     private static KeyGenerator keyGenerator;
     private static final int tagLen = 16 * 8;
-    private static final int ivLen = 12;
+    public static final int ivLen = 12;
     private byte[] iv = new byte[ivLen];
     private Cipher cipher;
     MessageDigest digest;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index c019112..4adada5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -50,6 +51,7 @@ public class MessageImpl<T> extends MessageRecordImpl<T, 
MessageId> {
     private ClientCnx cnx;
     private ByteBuf payload;
     private Schema<T> schema;
+    private Optional<EncryptionContext> encryptionCtx = Optional.empty();
 
     transient private Map<String, String> properties;
 
@@ -81,6 +83,11 @@ public class MessageImpl<T> extends MessageRecordImpl<T, 
MessageId> {
     // Constructor for incoming message
     MessageImpl(MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf 
payload, ClientCnx cnx,
             Schema<T> schema) {
+        this(messageId, msgMetadata, payload, null, cnx, schema);
+    }
+    
+    MessageImpl(MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf 
payload,
+            Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, 
Schema<T> schema) {
         this.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata);
         this.messageId = messageId;
         this.cnx = cnx;
@@ -89,6 +96,7 @@ public class MessageImpl<T> extends MessageRecordImpl<T, 
MessageId> {
         // release, since the Message is passed to the user. Also, the passed 
ByteBuf is coming from network and is
         // backed by a direct buffer which we could not expose as a byte[]
         this.payload = Unpooled.copiedBuffer(payload);
+        this.encryptionCtx = encryptionCtx;
 
         if (msgMetadata.getPropertiesCount() > 0) {
             this.properties = 
Collections.unmodifiableMap(msgMetadataBuilder.getPropertiesList().stream()
@@ -100,12 +108,14 @@ public class MessageImpl<T> extends MessageRecordImpl<T, 
MessageId> {
     }
 
     MessageImpl(BatchMessageIdImpl batchMessageIdImpl, MessageMetadata 
msgMetadata,
-            PulsarApi.SingleMessageMetadata singleMessageMetadata, ByteBuf 
payload, ClientCnx cnx, Schema<T> schema) {
+            PulsarApi.SingleMessageMetadata singleMessageMetadata, ByteBuf 
payload,
+            Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, 
Schema<T> schema) {
         this.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata);
         this.messageId = batchMessageIdImpl;
         this.cnx = cnx;
 
         this.payload = Unpooled.copiedBuffer(payload);
+        this.encryptionCtx = encryptionCtx;
 
         if (singleMessageMetadata.getPropertiesCount() > 0) {
             Map<String, String> properties = Maps.newTreeMap();
@@ -319,4 +329,8 @@ public class MessageImpl<T> extends MessageRecordImpl<T, 
MessageId> {
     void setMessageId(MessageIdImpl messageId) {
         this.messageId = messageId;
     }
+    
+    public Optional<EncryptionContext> getEncryptionCtx() {
+        return encryptionCtx;
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
index 1ab6d04..8cf0328 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
@@ -25,6 +25,7 @@ import static 
org.apache.pulsar.common.api.Commands.readChecksum;
 import io.netty.buffer.ByteBuf;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import lombok.experimental.UtilityClass;
 import lombok.extern.slf4j.Slf4j;
@@ -175,7 +176,7 @@ public class MessageParser {
                 BatchMessageIdImpl batchMessageIdImpl = new 
BatchMessageIdImpl(messageId.getLedgerId(),
                         messageId.getEntryId(), partitionIndex, i, null);
                 final MessageImpl<?> message = new 
MessageImpl<>(batchMessageIdImpl, msgMetadata,
-                        singleMessageMetadataBuilder.build(), 
singleMessagePayload, cnx, null);
+                        singleMessageMetadataBuilder.build(), 
singleMessagePayload, Optional.empty(), cnx, null);
 
                 processor.process(batchMessageIdImpl, message, 
singleMessagePayload);
 

Reply via email to