This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dccc06bf50b [fix][broker] support peek-message for compressed and
encrypted messages (#23234)
dccc06bf50b is described below
commit dccc06bf50bb5ca510b39167908c02d2b4602ca5
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Aug 29 22:25:58 2024 -0700
[fix][broker] support peek-message for compressed and encrypted messages
(#23234)
---
.../org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 6 +++++-
.../org/apache/pulsar/client/api/SimpleProducerConsumerTest.java | 5 +++++
2 files changed, 10 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 40e74f83e98..b2d455f645d 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;
+import static org.apache.pulsar.common.api.proto.CompressionType.NONE;
import static org.apache.pulsar.common.naming.SystemTopicNames.isSystemTopic;
import static
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign;
import static
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
@@ -2999,6 +3000,7 @@ public class PersistentTopicsBase extends AdminResource {
checkNotNull(entry);
Position pos = entry.getPosition();
ByteBuf metadataAndPayload = entry.getDataBuffer();
+ boolean isEncrypted = false;
long totalSize = metadataAndPayload.readableBytes();
BrokerEntryMetadata brokerEntryMetadata =
Commands.peekBrokerEntryMetadataIfExist(metadataAndPayload);
@@ -3070,6 +3072,7 @@ public class PersistentTopicsBase extends AdminResource {
for (EncryptionKeys encryptionKeys : metadata.getEncryptionKeysList())
{
responseBuilder.header("X-Pulsar-Base64-encryption-keys",
Base64.getEncoder().encodeToString(encryptionKeys.toByteArray()));
+ isEncrypted = true;
}
if (metadata.hasEncryptionParam()) {
responseBuilder.header("X-Pulsar-Base64-encryption-param",
@@ -3123,7 +3126,8 @@ public class PersistentTopicsBase extends AdminResource {
responseBuilder.header("X-Pulsar-txn-uncommitted", isTxnUncommitted);
// Decode if needed
- CompressionCodec codec =
CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
+ CompressionCodec codec = CompressionCodecProvider
+ .getCompressionCodec(isEncrypted ? NONE :
metadata.getCompression());
ByteBuf uncompressedPayload = codec.decode(metadataAndPayload,
metadata.getUncompressedSize());
// Copy into a heap buffer for output stream compatibility
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index a9d97b7febd..61dd33be64a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2712,12 +2712,17 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
Producer<byte[]> cryptoProducer = pulsarClient.newProducer()
.topic(topicName).addEncryptionKey("client-ecdsa.pem")
+ .compressionType(CompressionType.LZ4)
.cryptoKeyReader(new EncKeyReader()).create();
for (int i = 0; i < totalMsg; i++) {
String message = "my-message-" + i;
cryptoProducer.send(message.getBytes());
}
+ // admin api should be able to fetch compressed and encrypted message
+ List<Message<byte[]>> msgs = admin.topics().peekMessages(topicName,
"my-subscriber-name", 1);
+ assertNotNull(msgs);
+
Message<byte[]> msg;
msg = normalConsumer.receive(RECEIVE_TIMEOUT_MEDIUM_MILLIS,
TimeUnit.MILLISECONDS);