This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5d4fe65a7713789346c600fc87c068e41648b377
Author: JiangHaiting <[email protected]>
AuthorDate: Tue Jan 18 11:18:27 2022 +0800

    Fix encrption bug with chunked message (#13689)
    
    # Motivation
    
    Fix issue #13688.
    
    Send chunking message failed with 
`org.apache.pulsar.client.api.PulsarClientException$TimeoutException` when 
encryption is enabled.
    
    ### Modifications
    
    The root cause is that all chunked messages share the same msgMetadata 
object.
    The `EncryptionKeys` will be repeated added into message metadata.
    And proto buffer objects do not support serialization with bytes value type.
    
    (cherry picked from commit c1ff87c99828e0d4dfb45b0c98e3ce5ed91e2221)
---
 .../client/api/SimpleProducerConsumerTest.java     | 33 ++++++++++++++++++++++
 .../pulsar/client/impl/crypto/MessageCryptoBc.java |  1 +
 2 files changed, 34 insertions(+)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 2eebbf8..fc238d1 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -77,6 +77,7 @@ import org.apache.avro.Schema.Parser;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
 import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -2621,6 +2622,38 @@ public class SimpleProducerConsumerTest extends 
ProducerConsumerBase {
     }
 
     @Test
+    public void testCryptoWithChunking() throws Exception {
+        final String topic = 
"persistent://my-property/my-ns/testCryptoWithChunking" + 
System.currentTimeMillis();
+        final String ecdsaPublicKeyFile = 
"file:./src/test/resources/certificate/public-key.client-ecdsa.pem";
+        final String ecdsaPrivateKeyFile = 
"file:./src/test/resources/certificate/private-key.client-ecdsa.pem";
+
+        this.conf.setMaxMessageSize(1000);
+
+        @Cleanup
+        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+
+        @Cleanup
+        Consumer<byte[]> consumer1 = 
pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+                .defaultCryptoKeyReader(ecdsaPrivateKeyFile).subscribe();
+        @Cleanup
+        Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic)
+                .enableChunking(true)
+                .enableBatching(false)
+                .addEncryptionKey("client-ecdsa.pem")
+                .defaultCryptoKeyReader(ecdsaPublicKeyFile)
+                .create();
+
+        byte[] data = RandomUtils.nextBytes(5100);
+        MessageId id = producer1.send(data);
+        log.info("Message Id={}", id);
+
+        MessageImpl<byte[]> message;
+        message = (MessageImpl<byte[]>) consumer1.receive();
+        Assert.assertEquals(message.getData(), data);
+        Assert.assertEquals(message.getEncryptionCtx().get().getKeys().size(), 
1);
+    }
+
+    @Test
     public void testDefaultCryptoKeyReader() throws Exception {
         final String topic = 
"persistent://my-property/my-ns/default-crypto-key-reader" + 
System.currentTimeMillis();
         final String ecdsaPublicKeyFile = 
"file:./src/test/resources/certificate/public-key.client-ecdsa.pem";
diff --git 
a/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java
 
b/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java
index e4f3200..4ce4574 100644
--- 
a/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java
+++ 
b/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java
@@ -389,6 +389,7 @@ public class MessageCryptoBc implements 
MessageCrypto<MessageMetadata, MessageMe
             return;
         }
 
+        msgMetadata.clearEncryptionKeys();
         // Update message metadata with encrypted data key
         for (String keyName : encKeys) {
             if (encryptedDataKeyMap.get(keyName) == null) {

Reply via email to