This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dccc06bf50b [fix][broker] support peek-message for compressed and 
encrypted messages (#23234)
dccc06bf50b is described below

commit dccc06bf50bb5ca510b39167908c02d2b4602ca5
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Thu Aug 29 22:25:58 2024 -0700

    [fix][broker] support peek-message for compressed and encrypted messages 
(#23234)
---
 .../org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java   | 6 +++++-
 .../org/apache/pulsar/client/api/SimpleProducerConsumerTest.java    | 5 +++++
 2 files changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 40e74f83e98..b2d455f645d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
+import static org.apache.pulsar.common.api.proto.CompressionType.NONE;
 import static org.apache.pulsar.common.naming.SystemTopicNames.isSystemTopic;
 import static 
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign;
 import static 
org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
@@ -2999,6 +3000,7 @@ public class PersistentTopicsBase extends AdminResource {
         checkNotNull(entry);
         Position pos = entry.getPosition();
         ByteBuf metadataAndPayload = entry.getDataBuffer();
+        boolean isEncrypted = false;
 
         long totalSize = metadataAndPayload.readableBytes();
         BrokerEntryMetadata brokerEntryMetadata = 
Commands.peekBrokerEntryMetadataIfExist(metadataAndPayload);
@@ -3070,6 +3072,7 @@ public class PersistentTopicsBase extends AdminResource {
         for (EncryptionKeys encryptionKeys : metadata.getEncryptionKeysList()) 
{
             responseBuilder.header("X-Pulsar-Base64-encryption-keys",
                     
Base64.getEncoder().encodeToString(encryptionKeys.toByteArray()));
+            isEncrypted = true;
         }
         if (metadata.hasEncryptionParam()) {
             responseBuilder.header("X-Pulsar-Base64-encryption-param",
@@ -3123,7 +3126,8 @@ public class PersistentTopicsBase extends AdminResource {
         responseBuilder.header("X-Pulsar-txn-uncommitted", isTxnUncommitted);
 
         // Decode if needed
-        CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
+        CompressionCodec codec = CompressionCodecProvider
+                .getCompressionCodec(isEncrypted ? NONE : 
metadata.getCompression());
         ByteBuf uncompressedPayload = codec.decode(metadataAndPayload, 
metadata.getUncompressedSize());
 
         // Copy into a heap buffer for output stream compatibility
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index a9d97b7febd..61dd33be64a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2712,12 +2712,17 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
 
         Producer<byte[]> cryptoProducer = pulsarClient.newProducer()
                 .topic(topicName).addEncryptionKey("client-ecdsa.pem")
+                .compressionType(CompressionType.LZ4)
                 .cryptoKeyReader(new EncKeyReader()).create();
         for (int i = 0; i < totalMsg; i++) {
             String message = "my-message-" + i;
             cryptoProducer.send(message.getBytes());
         }
 
+        // admin api should be able to fetch compressed and encrypted message
+        List<Message<byte[]>> msgs = admin.topics().peekMessages(topicName, 
"my-subscriber-name", 1);
+        assertNotNull(msgs);
+
         Message<byte[]> msg;
 
         msg = normalConsumer.receive(RECEIVE_TIMEOUT_MEDIUM_MILLIS, 
TimeUnit.MILLISECONDS);

Reply via email to