This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d01ecb0 Enable peeking encrypted batch messages (#11244)
d01ecb0 is described below
commit d01ecb041847070eb1d011ebccadab5ee1d40ca3
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Thu Jul 8 05:54:11 2021 +0900
Enable peeking encrypted batch messages (#11244)
---
.../broker/admin/impl/PersistentTopicsBase.java | 1 +
.../apache/pulsar/broker/admin/AdminApiTest.java | 43 ++++++++++++++++++++++
.../pulsar/client/admin/internal/TopicsImpl.java | 11 +++++-
3 files changed, 54 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index d85ea53..a3c4c3e 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2519,6 +2519,7 @@ public class PersistentTopicsBase extends AdminResource {
responseBuilder.header("X-Pulsar-PROPERTY-TOTAL-CHUNKS",
Integer.toString(metadata.getNumChunksFromMsg()));
responseBuilder.header("X-Pulsar-PROPERTY-CHUNK-ID",
Integer.toString(metadata.getChunkId()));
}
+ responseBuilder.header("X-Pulsar-Is-Encrypted",
metadata.getEncryptionKeysCount() > 0);
// Decode if needed
CompressionCodec codec =
CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 5c91131..f16d2a6 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -84,6 +84,7 @@ import org.apache.pulsar.client.admin.internal.TenantsImpl;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
@@ -2988,6 +2989,48 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
final String topicName =
"non-persistent://prop-xyz/ns1/testTruncateTopic-" +
UUID.randomUUID().toString();
admin.topics().createNonPartitionedTopic(topicName);
assertThrows(() -> {admin.topics().truncate(topicName);});
+ }
+
+ @Test(timeOut = 20000)
+ public void testPeekEncryptedMessages() throws Exception {
+ final String topicName =
"persistent://prop-xyz/ns1/testPeekEncryptedMessages-" +
UUID.randomUUID().toString();
+ final String subName = "my-sub";
+
+ admin.topics().createNonPartitionedTopic(topicName);
+ admin.topics().createSubscription(topicName, subName,
MessageId.latest);
+
+ final Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topicName)
+ .enableBatching(true)
+ .addEncryptionKey("my-app-key")
+
.defaultCryptoKeyReader("file:./src/test/resources/certificate/public-key.client-rsa.pem")
+ .create();
+
+ for (int i = 0; i < 5; i++) {
+ producer.send(("message-" + i).getBytes());
+ }
+ producer.close();
+
+ final List<Message<byte[]>> peekedMessages =
admin.topics().peekMessages(topicName, subName, 5);
+ assertEquals(peekedMessages.size(), 5);
+
+ final Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName(subName)
+ .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+ .subscribe();
+ final List<Message<byte[]>> receivedMessages = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+ receivedMessages.add(msg);
+ consumer.acknowledge(msg);
+ }
+ consumer.unsubscribe();
+
+ for (int i = 0; i < 5; i++) {
+ assertEquals(peekedMessages.get(i).getMessageId(),
receivedMessages.get(i).getMessageId());
+ assertEquals(peekedMessages.get(i).getData(),
receivedMessages.get(i).getData());
+ }
}
}
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index ae4de198..d122ded 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1535,8 +1535,17 @@ public class TopicsImpl extends BaseResource implements
Topics {
}
tmp = headers.getFirst(BATCH_HEADER);
- if (response.getHeaderString(BATCH_HEADER) != null) {
+ if (tmp != null) {
properties.put(BATCH_HEADER, (String) tmp);
+ }
+
+ boolean isEncrypted = false;
+ tmp = headers.getFirst("X-Pulsar-Is-Encrypted");
+ if (tmp != null) {
+ isEncrypted = Boolean.parseBoolean(tmp.toString());
+ }
+
+ if (!isEncrypted && response.getHeaderString(BATCH_HEADER) !=
null) {
return getIndividualMsgsFromBatch(topic, msgId, data,
properties, messageMetadata);
}