rdhabalia closed pull request #2024: Forward encryption properties with
encrypted payload to consumer
URL: https://github.com/apache/incubator-pulsar/pull/2024
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index ab2af4c6dc..7fb027d472 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -35,6 +35,7 @@
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -54,10 +55,22 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import
org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
import org.apache.pulsar.client.impl.ConsumerImpl;
+import org.apache.pulsar.client.impl.EncryptionContext;
+import org.apache.pulsar.client.impl.EncryptionContext.EncryptionKey;
+import org.apache.pulsar.client.impl.MessageCrypto;
import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarDecoder;
+import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
+import org.apache.pulsar.common.compression.CompressionCodec;
+import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -67,8 +80,12 @@
import org.testng.annotations.Test;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
public class SimpleProducerConsumerTest extends ProducerConsumerBase {
private static final Logger log =
LoggerFactory.getLogger(SimpleProducerConsumerTest.class);
@@ -2222,10 +2239,13 @@ public EncryptionKeyInfo getPrivateKey(String keyName,
Map<String, String> keyMe
producer2.send(message.getBytes());
}
- Message<byte[]> msg = null;
+ MessageImpl<byte[]> msg = null;
for (int i = 0; i < totalMsg * 2; i++) {
- msg = consumer.receive(5, TimeUnit.SECONDS);
+ msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+ // verify that encrypted message contains encryption-context
+ msg.getEncryptionCtx()
+ .orElseThrow(() -> new
IllegalStateException("encryption-ctx not present for encrypted message"));
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
@@ -2276,7 +2296,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName,
Map<String, String> keyMe
final int totalMsg = 10;
- Message<byte[]> msg = null;
+ MessageImpl<byte[]> msg = null;
Set<String> messageSet = Sets.newHashSet();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("persistent://my-property/use/myenc-ns/myenc-topic1").subscriptionName("my-subscriber-name")
@@ -2307,7 +2327,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName,
Map<String, String> keyMe
// 3. KeyReder is not set by consumer
// Receive should fail since key reader is not setup
- msg = consumer.receive(5, TimeUnit.SECONDS);
+ msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(msg, "Receive should have failed with no keyreader");
// 4. Set consumer config to consume even if decryption fails
@@ -2319,7 +2339,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName,
Map<String, String> keyMe
int msgNum = 0;
try {
// Receive should proceed and deliver encrypted message
- msg = consumer.receive(5, TimeUnit.SECONDS);
+ msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
String expectedMessage = "my-message-" + msgNum++;
Assert.assertNotEquals(receivedMessage, expectedMessage, "Received
encrypted message " + receivedMessage
@@ -2338,7 +2358,10 @@ public EncryptionKeyInfo getPrivateKey(String keyName,
Map<String, String> keyMe
.cryptoKeyReader(new
EncKeyReader()).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
for (int i = msgNum; i < totalMsg - 1; i++) {
- msg = consumer.receive(5, TimeUnit.SECONDS);
+ msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
+ // verify that encrypted message contains encryption-context
+ msg.getEncryptionCtx()
+ .orElseThrow(() -> new
IllegalStateException("encryption-ctx not present for encrypted message"));
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
@@ -2355,12 +2378,136 @@ public EncryptionKeyInfo getPrivateKey(String keyName,
Map<String, String> keyMe
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
// Receive should proceed and discard encrypted messages
- msg = consumer.receive(5, TimeUnit.SECONDS);
+ msg = (MessageImpl<byte[]>) consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(msg, "Message received even aftet
ConsumerCryptoFailureAction.DISCARD is set.");
log.info("-- Exiting {} test --", methodName);
}
+ @Test(groups = "encryption")
+ public void testEncryptionConsumerWithoutCryptoReader() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final String encryptionKeyName = "client-rsa.pem";
+ final String encryptionKeyVersion = "1.0";
+ Map<String, String> metadata = Maps.newHashMap();
+ metadata.put("version", encryptionKeyVersion);
+ class EncKeyReader implements CryptoKeyReader {
+ EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
+
+ @Override
+ public EncryptionKeyInfo getPublicKey(String keyName, Map<String,
String> keyMeta) {
+ String CERT_FILE_PATH =
"./src/test/resources/certificate/public-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ keyInfo.setMetadata(metadata);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " +
CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is
not present or not readable.");
+ }
+ return null;
+ }
+
+ @Override
+ public EncryptionKeyInfo getPrivateKey(String keyName, Map<String,
String> keyMeta) {
+ String CERT_FILE_PATH =
"./src/test/resources/certificate/private-key." + keyName;
+ if (Files.isReadable(Paths.get(CERT_FILE_PATH))) {
+ try {
+
keyInfo.setKey(Files.readAllBytes(Paths.get(CERT_FILE_PATH)));
+ keyInfo.setMetadata(metadata);
+ return keyInfo;
+ } catch (IOException e) {
+ Assert.fail("Failed to read certificate from " +
CERT_FILE_PATH);
+ }
+ } else {
+ Assert.fail("Certificate file " + CERT_FILE_PATH + " is
not present or not readable.");
+ }
+ return null;
+ }
+ }
+
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic("persistent://my-property/my-ns/myrsa-topic1")
+
.subscriptionName("my-subscriber-name").cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+ .subscribe();
+
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic("persistent://my-property/my-ns/myrsa-topic1")
+
.addEncryptionKey(encryptionKeyName).compressionType(CompressionType.LZ4)
+ .cryptoKeyReader(new EncKeyReader()).create();
+
+ String message = "my-message";
+ producer.send(message.getBytes());
+
+ MessageImpl<byte[]> msg = (MessageImpl<byte[]>) consumer.receive(5,
TimeUnit.SECONDS);
+
+ String receivedMessage = decryptMessage(msg, encryptionKeyName, new
EncKeyReader());
+ assertEquals(message, receivedMessage);
+
+ consumer.close();
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ private String decryptMessage(MessageImpl<byte[]> msg, String
encryptionKeyName, CryptoKeyReader reader)
+ throws Exception {
+ Optional<EncryptionContext> ctx = msg.getEncryptionCtx();
+ Assert.assertTrue(ctx.isPresent());
+ EncryptionContext encryptionCtx = ctx
+ .orElseThrow(() -> new IllegalStateException("encryption-ctx
not present for encrypted message"));
+
+ Map<String, EncryptionKey> keys = encryptionCtx.getKeys();
+ assertEquals(keys.size(), 1);
+ EncryptionKey encryptionKey = keys.get(encryptionKeyName);
+ byte[] dataKey = encryptionKey.getKeyValue();
+ Map<String, String> metadata = encryptionKey.getMetadata();
+ String version = metadata.get("version");
+ assertEquals(version, "1.0");
+
+ org.apache.pulsar.common.api.proto.PulsarApi.CompressionType
compressionType = encryptionCtx
+ .getCompressionType();
+ int uncompressedSize = encryptionCtx.getUncompressedMessageSize();
+ byte[] encrParam = encryptionCtx.getParam();
+ String encAlgo = encryptionCtx.getAlgorithm();
+ int batchSize = encryptionCtx.getBatchSize().orElse(0);
+
+ ByteBuf payloadBuf = Unpooled.wrappedBuffer(msg.getData());
+ // try to decrypt
+ MessageCrypto crypto = new MessageCrypto("test", false);
+ Builder metadataBuilder = MessageMetadata.newBuilder();
+ org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys.Builder
encKeyBuilder = EncryptionKeys.newBuilder();
+ encKeyBuilder.setKey(encryptionKeyName);
+ ByteString keyValue = ByteString.copyFrom(dataKey);
+ encKeyBuilder.setValue(keyValue);
+ EncryptionKeys encKey = encKeyBuilder.build();
+ metadataBuilder.setEncryptionParam(ByteString.copyFrom(encrParam));
+ metadataBuilder.setEncryptionAlgo(encAlgo);
+ metadataBuilder.setProducerName("test");
+ metadataBuilder.setSequenceId(123);
+ metadataBuilder.setPublishTime(12333453454L);
+ metadataBuilder.addEncryptionKeys(encKey);
+ metadataBuilder.setCompression(compressionType);
+ metadataBuilder.setUncompressedSize(uncompressedSize);
+ ByteBuf decryptedPayload = crypto.decrypt(metadataBuilder.build(),
payloadBuf, reader);
+
+ // try to uncompress
+ CompressionCodec codec =
CompressionCodecProvider.getCompressionCodec(compressionType);
+ ByteBuf uncompressedPayload = codec.decode(decryptedPayload,
uncompressedSize);
+
+ if (batchSize > 0) {
+ PulsarApi.SingleMessageMetadata.Builder
singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
+ .newBuilder();
+ uncompressedPayload =
Commands.deSerializeSingleMessageInBatch(uncompressedPayload,
+ singleMessageMetadataBuilder, 0, batchSize);
+ }
+
+ byte[] data = new byte[uncompressedPayload.readableBytes()];
+ uncompressedPayload.readBytes(data);
+ uncompressedPayload.release();
+ return new String(data);
+ }
+
@Test
public void testConsumerSubscriptionInitialize() throws Exception {
log.info("-- Starting {} test --", methodName);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
index b67383af96..e71b798359 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerCryptoFailureAction.java
@@ -19,11 +19,22 @@
package org.apache.pulsar.client.api;
+import org.apache.pulsar.client.impl.EncryptionContext;
+
public enum ConsumerCryptoFailureAction {
FAIL, // This is the default option to fail consume until crypto succeeds
DISCARD, // Message is silently acknowledged and not delivered to the
application
- CONSUME // Deliver the encrypted message to the application. It's the
application's
- // responsibility to decrypt the message. If message is also
compressed,
- // decompression will fail. If message contain batch messages,
client will
- // not be able to retrieve individual messages in the batch
+ /**
+ *
+ * <pre>
+ * Deliver the encrypted message to the application. It's the
application's responsibility to decrypt the message.
+ * If message is also compressed, decompression will fail. If message
contain batch messages, client will not be
+ * able to retrieve individual messages in the batch.
+ * </pre>
+ *
+ * Delivered encrypted message contains {@link EncryptionContext} which
contains encryption and compression
+ * information in it using which application can decrypt consumed message
payload.
+ *
+ */
+ CONSUME;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index d94fe8b62d..e7924cbd0b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -27,17 +27,20 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.util.Timeout;
import java.io.IOException;
import java.util.ArrayList;
+import static java.util.Base64.getEncoder;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -50,6 +53,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
+
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.ConsumerStats;
@@ -59,6 +63,7 @@
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.EncryptionContext.EncryptionKey;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarDecoder;
@@ -67,6 +72,8 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.ValidationError;
import
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
+import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
@@ -76,6 +83,7 @@
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
public class ConsumerImpl<T> extends ConsumerBase<T> implements
ConnectionHandler.Connection {
private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
@@ -709,11 +717,17 @@ void messageReceived(MessageIdData messageId, ByteBuf
headersAndPayload, ClientC
}
ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId,
msgMetadata, payload, cnx);
+
+ boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata);
+
if (decryptedPayload == null) {
// Message was discarded or CryptoKeyReader isn't implemented
return;
}
- ByteBuf uncompressedPayload = uncompressPayloadIfNeeded(messageId,
msgMetadata, decryptedPayload, cnx);
+
+ // uncompress decryptedPayload and release decryptedPayload-ByteBuf
+ ByteBuf uncompressedPayload = isMessageUndecryptable ?
decryptedPayload.retain()
+ : uncompressPayloadIfNeeded(messageId, msgMetadata,
decryptedPayload, cnx);
decryptedPayload.release();
if (uncompressedPayload == null) {
// Message was discarded on decompression error
@@ -722,8 +736,11 @@ void messageReceived(MessageIdData messageId, ByteBuf
headersAndPayload, ClientC
final int numMessages = msgMetadata.getNumMessagesInBatch();
- if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()) {
- final MessageImpl<T> message = new MessageImpl<>(msgId,
msgMetadata, uncompressedPayload, cnx, schema);
+ // if message is not decryptable then it can't be parsed as a
batch-message. so, add EncyrptionCtx to message
+ // and return undecrypted payload
+ if (isMessageUndecryptable || (numMessages == 1 &&
!msgMetadata.hasNumMessagesInBatch())) {
+ final MessageImpl<T> message = new MessageImpl<>(msgId,
msgMetadata, uncompressedPayload,
+ createEncryptionContext(msgMetadata), cnx, schema);
uncompressedPayload.release();
msgMetadata.recycle();
@@ -895,7 +912,8 @@ void receiveIndividualMessagesFromBatch(MessageMetadata
msgMetadata, ByteBuf unc
BatchMessageIdImpl batchMessageIdImpl = new
BatchMessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), getPartitionIndex(), i, acker);
final MessageImpl<T> message = new
MessageImpl<>(batchMessageIdImpl, msgMetadata,
- singleMessageMetadataBuilder.build(),
singleMessagePayload, cnx, schema);
+ singleMessageMetadataBuilder.build(),
singleMessagePayload,
+ createEncryptionContext(msgMetadata), cnx, schema);
lock.readLock().lock();
try {
if (pendingReceives.isEmpty()) {
@@ -1322,6 +1340,45 @@ private MessageIdImpl getMessageIdImpl(Message<?> msg) {
return messageId;
}
+
+ private boolean isMessageUndecryptable(MessageMetadata msgMetadata) {
+ return (msgMetadata.getEncryptionKeysCount() > 0 &&
conf.getCryptoKeyReader() == null
+ && conf.getCryptoFailureAction() ==
ConsumerCryptoFailureAction.CONSUME);
+ }
+
+ /**
+ * Create EncryptionContext if message payload is encrypted
+ *
+ * @param msgMetadata
+ * @return {@link Optional}<{@link EncryptionContext}>
+ */
+ private Optional<EncryptionContext>
createEncryptionContext(MessageMetadata msgMetadata) {
+
+ EncryptionContext encryptionCtx = null;
+ if (msgMetadata.getEncryptionKeysCount() > 0) {
+ encryptionCtx = new EncryptionContext();
+ Map<String, EncryptionKey> keys =
msgMetadata.getEncryptionKeysList().stream()
+ .collect(
+ Collectors.toMap(EncryptionKeys::getKey,
+ e -> new
EncryptionKey(e.getValue().toByteArray(),
+ e.getMetadataList() != null
+ ?
e.getMetadataList().stream().collect(
+
Collectors.toMap(KeyValue::getKey, KeyValue::getValue))
+ : null)));
+ byte[] encParam = new byte[MessageCrypto.ivLen];
+ msgMetadata.getEncryptionParam().copyTo(encParam, 0);
+ Optional<Integer> batchSize = Optional
+ .ofNullable(msgMetadata.hasNumMessagesInBatch() ?
msgMetadata.getNumMessagesInBatch() : null);
+ encryptionCtx.setKeys(keys);
+ encryptionCtx.setParam(encParam);
+ encryptionCtx.setAlgorithm(msgMetadata.getEncryptionAlgo());
+ encryptionCtx.setCompressionType(msgMetadata.getCompression());
+
encryptionCtx.setUncompressedMessageSize(msgMetadata.getUncompressedSize());
+ encryptionCtx.setBatchSize(batchSize);
+ }
+ return Optional.ofNullable(encryptionCtx);
+ }
+
private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
int messagesFromQueue = 0;
Message<T> peek = incomingMessages.peek();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
new file mode 100644
index 0000000000..ba7018e253
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/EncryptionContext.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+public class EncryptionContext {
+
+ private Map<String, EncryptionKey> keys;
+ private byte[] param;
+ private Map<String, String> metadata;
+ private String algorithm;
+ private CompressionType compressionType;
+ private int uncompressedMessageSize;
+ private Optional<Integer> batchSize;
+
+ @Getter
+ @Setter
+ @NoArgsConstructor
+ @AllArgsConstructor
+ public static class EncryptionKey {
+ private byte[] keyValue;
+ private Map<String, String> metadata;
+ }
+
+}
\ No newline at end of file
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
index 2b57470513..116410873b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
@@ -97,7 +97,7 @@
private static KeyGenerator keyGenerator;
private static final int tagLen = 16 * 8;
- private static final int ivLen = 12;
+ public static final int ivLen = 12;
private byte[] iv = new byte[ivLen];
private Cipher cipher;
MessageDigest digest;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index c019112c33..4adada5e01 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -50,6 +51,7 @@
private ClientCnx cnx;
private ByteBuf payload;
private Schema<T> schema;
+ private Optional<EncryptionContext> encryptionCtx = Optional.empty();
transient private Map<String, String> properties;
@@ -81,6 +83,11 @@
// Constructor for incoming message
MessageImpl(MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf
payload, ClientCnx cnx,
Schema<T> schema) {
+ this(messageId, msgMetadata, payload, null, cnx, schema);
+ }
+
+ MessageImpl(MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf
payload,
+ Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema) {
this.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata);
this.messageId = messageId;
this.cnx = cnx;
@@ -89,6 +96,7 @@
// release, since the Message is passed to the user. Also, the passed
ByteBuf is coming from network and is
// backed by a direct buffer which we could not expose as a byte[]
this.payload = Unpooled.copiedBuffer(payload);
+ this.encryptionCtx = encryptionCtx;
if (msgMetadata.getPropertiesCount() > 0) {
this.properties =
Collections.unmodifiableMap(msgMetadataBuilder.getPropertiesList().stream()
@@ -100,12 +108,14 @@
}
MessageImpl(BatchMessageIdImpl batchMessageIdImpl, MessageMetadata
msgMetadata,
- PulsarApi.SingleMessageMetadata singleMessageMetadata, ByteBuf
payload, ClientCnx cnx, Schema<T> schema) {
+ PulsarApi.SingleMessageMetadata singleMessageMetadata, ByteBuf
payload,
+ Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema) {
this.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata);
this.messageId = batchMessageIdImpl;
this.cnx = cnx;
this.payload = Unpooled.copiedBuffer(payload);
+ this.encryptionCtx = encryptionCtx;
if (singleMessageMetadata.getPropertiesCount() > 0) {
Map<String, String> properties = Maps.newTreeMap();
@@ -319,4 +329,8 @@ public boolean hasReplicateTo() {
void setMessageId(MessageIdImpl messageId) {
this.messageId = messageId;
}
+
+ public Optional<EncryptionContext> getEncryptionCtx() {
+ return encryptionCtx;
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
index 1ab6d04297..8cf0328613 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageParser.java
@@ -25,6 +25,7 @@
import io.netty.buffer.ByteBuf;
import java.io.IOException;
+import java.util.Optional;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
@@ -175,7 +176,7 @@ public static void
receiveIndividualMessagesFromBatch(MessageMetadata msgMetadat
BatchMessageIdImpl batchMessageIdImpl = new
BatchMessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), partitionIndex, i, null);
final MessageImpl<?> message = new
MessageImpl<>(batchMessageIdImpl, msgMetadata,
- singleMessageMetadataBuilder.build(),
singleMessagePayload, cnx, null);
+ singleMessageMetadataBuilder.build(),
singleMessagePayload, Optional.empty(), cnx, null);
processor.process(batchMessageIdImpl, message,
singleMessagePayload);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services