This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9b1cc78 Forward encryption properties with encrypted payload to
consumer (#2024)
9b1cc78 is described below
commit 9b1cc78e3e3f9897f931d186afdf5e02dc399136
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Sun Jul 1 15:40:02 2018 -0700
Forward encryption properties with encrypted payload to consumer (#2024)
* Forward encryption properties with encrypted payload to consumer
* add EncryptionCtx to message to store encryption metadata
---
.../client/api/SimpleProducerConsumerTest.java | 161 ++++++++++++++++++++-
.../client/api/ConsumerCryptoFailureAction.java | 19 ++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 65 ++++++++-
.../EncryptionContext.java} | 41 ++++--
.../apache/pulsar/client/impl/MessageCrypto.java | 2 +-
.../org/apache/pulsar/client/impl/MessageImpl.java | 16 +-
.../apache/pulsar/client/impl/MessageParser.java | 3 +-
7 files changed, 280 insertions(+), 27 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index ab2af4c..7fb027d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -35,6 +35,7 @@ import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -54,10 +55,22 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import
org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.EncryptionContext;
+import org.apache.pulsar.client.impl.EncryptionContext.EncryptionKey;
+import org.apache.pulsar.client.impl.MessageCrypto;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarDecoder;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -67,8 +80,12 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
public class SimpleProducerConsumerTest extends ProducerConsumerBase {
private static final Logger log =
LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
@@ -2222,10 +2239,13 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
producer2.send(message.getBytes());
}
- Message<byte[]> msg = null;
+ MessageImpl<byte[]> msg = null;
for (int i = 0; i < totalMsg * 2; i++) {
- msg = consumer.receive(5, TimeUnit.SECONDS);
+ msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+ // verify that encrypted message contains encryption-context
+ msg.getEncryptionCtx()
+ .orElseThrow(() -> new
IllegalStateException("encryption-ctx not present for encrypted message"));
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
@@ -2276,7 +2296,7 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
final int totalMsg = 10;
- Message<byte[]> msg = null;
+ MessageImpl<byte[]> msg = null;
Set<String> messageSet = Sets.newHashSet();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/use/myenc-ns/myenc-topic1").subscriptionName("my-subscriber-name")
@@ -2307,7 +2327,7 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
// 3. KeyReder is not set by consumer
// Receive should fail since key reader is not setup
- msg = consumer.receive(5, TimeUnit.SECONDS);
+ msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(msg, "Receive should have failed with no keyreader");
// 4. Set consumer config to consume even if decryption fails
@@ -2319,7 +2339,7 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
int msgNum = 0;
try {
// Receive should proceed and deliver encrypted message
- msg = consumer.receive(5, TimeUnit.SECONDS);
+ msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
String expectedMessage = "my-message-" + msgNum++;
Assert.assertNotEquals(receivedMessage, expectedMessage, "Received
encrypted message " + receivedMessage
@@ -2338,7 +2358,10 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
.cryptoKeyReader(new
EncKeyReader()).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
for (int i = msgNum; i < totalMsg - 1; i++) {
- msg = consumer.receive(5, TimeUnit.SECONDS);
+ msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+ // verify that encrypted message contains encryption-context
+ msg.getEncryptionCtx()
+ .orElseThrow(() -> new
IllegalStateException("encryption-ctx not present for encrypted message"));
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
@@ -2355,12 +2378,136 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
// Receive should proceed and discard encrypted messages
- msg = consumer.receive(5, TimeUnit.SECONDS);
+ msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(msg, "Message received even aftet
ConsumerCryptoFailureAction.DISCARD is set.");
log.info("-- Exiting {} test --", methodName);
}
+ @Test(groups = "encryption")
+ public void testEncryptionConsumerWithoutCryptoReader() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final String encryptionKeyName = "client-rsa.pem";
+ final String encryptionKeyVersion = "1.0";
+ Map<String, String> metadata = Maps.newHashMap();
+ metadata.put("version", encryptionKeyVersion);
+ class EncKeyReader implements CryptoKeyReader {
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map<String,
String> keyMeta) {
+ String CERT_FILE_PATH =
"./src/test/resources/certificate/public-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ keyInfo.setMetadata(metadata);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " +
CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is
not present or not readable.");
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> keyMeta) {
+ String CERT_FILE_PATH =
"./src/test/resources/certificate/private-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ keyInfo.setMetadata(metadata);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " +
CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is
not present or not readable.");
+ }
+ return null;
+ }
+ }
+
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+
.subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+ .subscribe();
+
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+ .cryptoKeyReader(new EncKeyReader()).create();
+
+ String message = "my-message";
+ producer.send(message.getBytes());
+
+ MessageImpl<byte[]> msg = (MessageImpl<byte[]>) consumer.receive(5,
TimeUnit.SECONDS);
+
+ String receivedMessage = decryptMessage(msg, encryptionKeyName, new
EncKeyReader());
+ assertEquals(message, receivedMessage);
+
+ consumer.close();
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ private String decryptMessage(MessageImpl<byte[]> msg, String
encryptionKeyName, CryptoKeyReader reader)
+ throws Exception {
+ Optional<EncryptionContext> ctx = msg.getEncryptionCtx();
+ Assert.assertTrue(ctx.isPresent());
+ EncryptionContext encryptionCtx = ctx
+ .orElseThrow(() -> new IllegalStateException("encryption-ctx
not present for encrypted message"));
+
+ Map<String, EncryptionKey> keys = encryptionCtx.getKeys();
+ assertEquals(keys.size(), 1);
+ EncryptionKey encryptionKey = keys.get(encryptionKeyName);
+ byte[] dataKey = encryptionKey.getKeyValue();
+ Map<String, String> metadata = encryptionKey.getMetadata();
+ String version = metadata.get("version");
+ assertEquals(version, "1.0");
+
+ org.apache.pulsar.common.api.proto.PulsarApi.CompressionType
compressionType = encryptionCtx
+ .getCompressionType();
+ int uncompressedSize = encryptionCtx.getUncompressedMessageSize();
+ byte[] encrParam = encryptionCtx.getParam();
+ String encAlgo = encryptionCtx.getAlgorithm();
+ int batchSize = encryptionCtx.getBatchSize().orElse(0);
+
+ ByteBuf payloadBuf = Unpooled.wrappedBuffer(msg.getData());
+ // try to decrypt
+ MessageCrypto crypto = new MessageCrypto("test", false);
+ Builder metadataBuilder = MessageMetadata.newBuilder();
+ org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys.Builder
encKeyBuilder = EncryptionKeys.newBuilder();
+ encKeyBuilder.setKey(encryptionKeyName);
+ ByteString keyValue = ByteString.copyFrom(dataKey);
+ encKeyBuilder.setValue(keyValue);
+ EncryptionKeys encKey = encKeyBuilder.build();
+ metadataBuilder.setEncryptionParam(ByteString.copyFrom(encrParam));
+ metadataBuilder.setEncryptionAlgo(encAlgo);
+ metadataBuilder.setProducerName("test");
+ metadataBuilder.setSequenceId(123);
+ metadataBuilder.setPublishTime(12333453454L);
+ metadataBuilder.addEncryptionKeys(encKey);
+ metadataBuilder.setCompression(compressionType);
+ metadataBuilder.setUncompressedSize(uncompressedSize);
+ ByteBuf decryptedPayload = crypto.decrypt(metadataBuilder.build(),
payloadBuf, reader);
+
+ // try to uncompress
+ CompressionCodec codec =
CompressionCodecProvider.getCompressionCodec(compressionType);
+ ByteBuf uncompressedPayload = codec.decode(decryptedPayload,
uncompressedSize);
+
+ if (batchSize > 0) {
+ PulsarApi.SingleMessageMetadata.Builder
singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
+ .newBuilder();
+ uncompressedPayload =
Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
+ singleMessageMetadataBuilder, 0, batchSize);
+ }
+
+ byte[] data = new byte[uncompressedPayload.readableBytes()];
+ uncompressedPayload.readBytes(data);
+ uncompressedPayload.release();
+ return new String(data);
+ }
+
@Test
public void testConsumerSubscriptionInitialize() throws Exception {
log.info("-- Starting {} test --", methodName);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
index b67383a..e71b798 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
@@ -19,11 +19,22 @@
package org.apache.pulsar.client.api;
+import org.apache.pulsar.client.impl.EncryptionContext;
+
public enum ConsumerCryptoFailureAction {
FAIL, // This is the default option to fail consume until crypto succeeds
DISCARD, // Message is silently acknowledged and not delivered to the
application
- CONSUME // Deliver the encrypted message to the application. It's the
application's
- // responsibility to decrypt the message. If message is also
compressed,
- // decompression will fail. If message contain batch messages,
client will
- // not be able to retrieve individual messages in the batch
+ /**
+ *
+ * <pre>
+ * Deliver the encrypted message to the application. It's the
application's responsibility to decrypt the message.
+ * If message is also compressed, decompression will fail. If message
contain batch messages, client will not be
+ * able to retrieve individual messages in the batch.
+ * </pre>
+ *
+ * Delivered encrypted message contains {@link EncryptionContext} which
contains encryption and compression
+ * information in it using which application can decrypt consumed message
payload.
+ *
+ */
+ CONSUME;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index d94fe8b..e7924cb 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -27,17 +27,20 @@ import static
org.apache.pulsar.common.api.Commands.readChecksum;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.util.Timeout;
import java.io.IOException;
import java.util.ArrayList;
+import static java.util.Base64.getEncoder;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -50,6 +53,7 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
+
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.ConsumerStats;
@@ -59,6 +63,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.EncryptionContext.EncryptionKey;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarDecoder;
@@ -67,6 +72,8 @@ import
org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
+import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
@@ -76,6 +83,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandler.Connection {
private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
@@ -709,11 +717,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
}
ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId,
msgMetadata, payload, cnx);
+
+ boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata);
+
if (decryptedPayload == null) {
// Message was discarded or CryptoKeyReader isn't implemented
return;
}
- ByteBuf uncompressedPayload = uncompressPayloadIfNeeded(messageId,
msgMetadata, decryptedPayload, cnx);
+
+ // uncompress decryptedPayload and release decryptedPayload-ByteBuf
+ ByteBuf uncompressedPayload = isMessageUndecryptable ?
decryptedPayload.retain()
+ : uncompressPayloadIfNeeded(messageId, msgMetadata,
decryptedPayload, cnx);
decryptedPayload.release();
if (uncompressedPayload == null) {
// Message was discarded on decompression error
@@ -722,8 +736,11 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
final int numMessages = msgMetadata.getNumMessagesInBatch();
- if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
- final MessageImpl<T> message = new MessageImpl<>(msgId,
msgMetadata, uncompressedPayload, cnx, schema);
+ // if message is not decryptable then it can't be parsed as a
batch-message. so, add EncyrptionCtx to message
+ // and return undecrypted payload
+ if (isMessageUndecryptable || (numMessages == 1 &&
!msgMetadata.hasNumMessagesInBatch())) {
+ final MessageImpl<T> message = new MessageImpl<>(msgId,
msgMetadata, uncompressedPayload,
+ createEncryptionContext(msgMetadata), cnx, schema);
uncompressedPayload.release();
msgMetadata.recycle();
@@ -895,7 +912,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
BatchMessageIdImpl batchMessageIdImpl = new
BatchMessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), getPartitionIndex(), i, acker);
final MessageImpl<T> message = new
MessageImpl<>(batchMessageIdImpl, msgMetadata,
- singleMessageMetadataBuilder.build(),
singleMessagePayload, cnx, schema);
+ singleMessageMetadataBuilder.build(),
singleMessagePayload,
+ createEncryptionContext(msgMetadata), cnx, schema);
lock.readLock().lock();
try {
if (pendingReceives.isEmpty()) {
@@ -1322,6 +1340,45 @@ public class ConsumerImpl<T> extends ConsumerBase<T>
implements ConnectionHandle
return messageId;
}
+
+ private boolean isMessageUndecryptable(MessageMetadata msgMetadata) {
+ return (msgMetadata.getEncryptionKeysCount() > 0 &&
conf.getCryptoKeyReader() == null
+ && conf.getCryptoFailureAction() ==
ConsumerCryptoFailureAction.CONSUME);
+ }
+
+ /**
+ * Create EncryptionContext if message payload is encrypted
+ *
+ * @param msgMetadata
+ * @return {@link Optional}<{@link EncryptionContext}>
+ */
+ private Optional<EncryptionContext>
createEncryptionContext(MessageMetadata msgMetadata) {
+
+ EncryptionContext encryptionCtx = null;
+ if (msgMetadata.getEncryptionKeysCount() > 0) {
+ encryptionCtx = new EncryptionContext();
+ Map<String, EncryptionKey> keys =
msgMetadata.getEncryptionKeysList().stream()
+ .collect(
+ Collectors.toMap(EncryptionKeys::getKey,
+ e -> new
EncryptionKey(e.getValue().toByteArray(),
+ e.getMetadataList() != null
+ ?
e.getMetadataList().stream().collect(
+
Collectors.toMap(KeyValue::getKey, KeyValue::getValue))
+ : null)));
+ byte[] encParam = new byte[MessageCrypto.ivLen];
+ msgMetadata.getEncryptionParam().copyTo(encParam, 0);
+ Optional<Integer> batchSize = Optional
+ .ofNullable(msgMetadata.hasNumMessagesInBatch() ?
msgMetadata.getNumMessagesInBatch() : null);
+ encryptionCtx.setKeys(keys);
+ encryptionCtx.setParam(encParam);
+ encryptionCtx.setAlgorithm(msgMetadata.getEncryptionAlgo());
+ encryptionCtx.setCompressionType(msgMetadata.getCompression());
+
encryptionCtx.setUncompressedMessageSize(msgMetadata.getUncompressedSize());
+ encryptionCtx.setBatchSize(batchSize);
+ }
+ return Optional.ofNullable(encryptionCtx);
+ }
+
private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
int messagesFromQueue = 0;
Message<T> peek = incomingMessages.peek();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
similarity index 50%
copy from
pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
copy to
pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
index b67383a..ba7018e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
@@ -16,14 +16,37 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.pulsar.client.impl;
-package org.apache.pulsar.client.api;
+import java.util.Map;
+import java.util.Optional;
-public enum ConsumerCryptoFailureAction {
- FAIL, // This is the default option to fail consume until crypto succeeds
- DISCARD, // Message is silently acknowledged and not delivered to the
application
- CONSUME // Deliver the encrypted message to the application. It's the
application's
- // responsibility to decrypt the message. If message is also
compressed,
- // decompression will fail. If message contain batch messages,
client will
- // not be able to retrieve individual messages in the batch
-}
+import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class EncryptionContext {
+
+ private Map<String, EncryptionKey> keys;
+ private byte[] param;
+ private Map<String, String> metadata;
+ private String algorithm;
+ private CompressionType compressionType;
+ private int uncompressedMessageSize;
+ private Optional<Integer> batchSize;
+
+ @Getter
+ @Setter
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class EncryptionKey {
+ private byte[] keyValue;
+ private Map<String, String> metadata;
+ }
+
+}
\ No newline at end of file
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
index 2b57470..1164108 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
@@ -97,7 +97,7 @@ public class MessageCrypto {
private static KeyGenerator keyGenerator;
private static final int tagLen = 16 * 8;
- private static final int ivLen = 12;
+ public static final int ivLen = 12;
private byte[] iv = new byte[ivLen];
private Cipher cipher;
MessageDigest digest;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index c019112..4adada5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -25,6 +25,7 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -50,6 +51,7 @@ public class MessageImpl<T> extends MessageRecordImpl<T,
MessageId> {
private ClientCnx cnx;
private ByteBuf payload;
private Schema<T> schema;
+ private Optional<EncryptionContext> encryptionCtx = Optional.empty();
transient private Map<String, String> properties;
@@ -81,6 +83,11 @@ public class MessageImpl<T> extends MessageRecordImpl<T,
MessageId> {
// Constructor for incoming message
MessageImpl(MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf
payload, ClientCnx cnx,
Schema<T> schema) {
+ this(messageId, msgMetadata, payload, null, cnx, schema);
+ }
+
+ MessageImpl(MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf
payload,
+ Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema) {
this.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata);
this.messageId = messageId;
this.cnx = cnx;
@@ -89,6 +96,7 @@ public class MessageImpl<T> extends MessageRecordImpl<T,
MessageId> {
// release, since the Message is passed to the user. Also, the passed
ByteBuf is coming from network and is
// backed by a direct buffer which we could not expose as a byte[]
this.payload = Unpooled.copiedBuffer(payload);
+ this.encryptionCtx = encryptionCtx;
if (msgMetadata.getPropertiesCount() > 0) {
this.properties =
Collections.unmodifiableMap(msgMetadataBuilder.getPropertiesList().stream()
@@ -100,12 +108,14 @@ public class MessageImpl<T> extends MessageRecordImpl<T,
MessageId> {
}
MessageImpl(BatchMessageIdImpl batchMessageIdImpl, MessageMetadata
msgMetadata,
- PulsarApi.SingleMessageMetadata singleMessageMetadata, ByteBuf
payload, ClientCnx cnx, Schema<T> schema) {
+ PulsarApi.SingleMessageMetadata singleMessageMetadata, ByteBuf
payload,
+ Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema) {
this.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata);
this.messageId = batchMessageIdImpl;
this.cnx = cnx;
this.payload = Unpooled.copiedBuffer(payload);
+ this.encryptionCtx = encryptionCtx;
if (singleMessageMetadata.getPropertiesCount() > 0) {
Map<String, String> properties = Maps.newTreeMap();
@@ -319,4 +329,8 @@ public class MessageImpl<T> extends MessageRecordImpl<T,
MessageId> {
void setMessageId(MessageIdImpl messageId) {
this.messageId = messageId;
}
+
+ public Optional<EncryptionContext> getEncryptionCtx() {
+ return encryptionCtx;
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
index 1ab6d04..8cf0328 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
@@ -25,6 +25,7 @@ import static
org.apache.pulsar.common.api.Commands.readChecksum;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
+import java.util.Optional;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
@@ -175,7 +176,7 @@ public class MessageParser {
BatchMessageIdImpl batchMessageIdImpl = new
BatchMessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), partitionIndex, i, null);
final MessageImpl<?> message = new
MessageImpl<>(batchMessageIdImpl, msgMetadata,
- singleMessageMetadataBuilder.build(),
singleMessagePayload, cnx, null);
+ singleMessageMetadataBuilder.build(),
singleMessagePayload, Optional.empty(), cnx, null);
processor.process(batchMessageIdImpl, message,
singleMessagePayload);