This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d3bc1eb9158f5f218d33e028fd5b6882928fc73e
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Thu Jul 8 05:54:11 2021 +0900

    Enable peeking encrypted batch messages (#11244)
    
    
    (cherry picked from commit d01ecb041847070eb1d011ebccadab5ee1d40ca3)
---
 .../broker/admin/impl/PersistentTopicsBase.java    |  1 +
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 43 ++++++++++++++++++++++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 11 +++++-
 3 files changed, 54 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 781de11..234f153 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2503,6 +2503,7 @@ public class PersistentTopicsBase extends AdminResource {
             responseBuilder.header("X-Pulsar-PROPERTY-TOTAL-CHUNKS", 
Integer.toString(metadata.getNumChunksFromMsg()));
             responseBuilder.header("X-Pulsar-PROPERTY-CHUNK-ID", 
Integer.toString(metadata.getChunkId()));
         }
+        responseBuilder.header("X-Pulsar-Is-Encrypted", 
metadata.getEncryptionKeysCount() > 0);
 
         // Decode if needed
         CompressionCodec codec = 
CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index fb8f799..82d7b1c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -84,6 +84,7 @@ import org.apache.pulsar.client.admin.internal.TenantsImpl;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -2997,6 +2998,48 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
         final String topicName = 
"non-persistent://prop-xyz/ns1/testTruncateTopic-" + 
UUID.randomUUID().toString();
         admin.topics().createNonPartitionedTopic(topicName);
         assertThrows(() -> {admin.topics().truncate(topicName);});
+    }
+
+    @Test(timeOut = 20000)
+    public void testPeekEncryptedMessages() throws Exception {
+        final String topicName = 
"persistent://prop-xyz/ns1/testPeekEncryptedMessages-" + 
UUID.randomUUID().toString();
+        final String subName = "my-sub";
+
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscription(topicName, subName, 
MessageId.latest);
+
+        final Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(true)
+                .addEncryptionKey("my-app-key")
+                
.defaultCryptoKeyReader("file:./src/test/resources/certificate/public-key.client-rsa.pem")
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            producer.send(("message-" + i).getBytes());
+        }
+        producer.close();
+
+        final List<Message<byte[]>> peekedMessages = 
admin.topics().peekMessages(topicName, subName, 5);
+        assertEquals(peekedMessages.size(), 5);
+
+        final Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subName)
+                .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+                .subscribe();
 
+        final List<Message<byte[]>> receivedMessages = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+            receivedMessages.add(msg);
+            consumer.acknowledge(msg);
+        }
+        consumer.unsubscribe();
+
+        for (int i = 0; i < 5; i++) {
+            assertEquals(peekedMessages.get(i).getMessageId(), 
receivedMessages.get(i).getMessageId());
+            assertEquals(peekedMessages.get(i).getData(), 
receivedMessages.get(i).getData());
+        }
     }
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 877448d..647804f 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1497,8 +1497,17 @@ public class TopicsImpl extends BaseResource implements 
Topics {
             }
 
             tmp = headers.getFirst(BATCH_HEADER);
-            if (response.getHeaderString(BATCH_HEADER) != null) {
+            if (tmp != null) {
                 properties.put(BATCH_HEADER, (String) tmp);
+            }
+
+            boolean isEncrypted = false;
+            tmp = headers.getFirst("X-Pulsar-Is-Encrypted");
+            if (tmp != null) {
+                isEncrypted = Boolean.parseBoolean(tmp.toString());
+            }
+
+            if (!isEncrypted && response.getHeaderString(BATCH_HEADER) != 
null) {
                 return getIndividualMsgsFromBatch(topic, msgId, data, 
properties, messageMetadata);
             }
 

Reply via email to