This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 5d4fe65a7713789346c600fc87c068e41648b377 Author: JiangHaiting <[email protected]> AuthorDate: Tue Jan 18 11:18:27 2022 +0800 Fix encrption bug with chunked message (#13689) # Motivation Fix issue #13688. Send chunking message failed with `org.apache.pulsar.client.api.PulsarClientException$TimeoutException` when encryption is enabled. ### Modifications The root cause is that all chunked messages share the same msgMetadata object. The `EncryptionKeys` will be repeated added into message metadata. And proto buffer objects do not support serialization with bytes value type. (cherry picked from commit c1ff87c99828e0d4dfb45b0c98e3ce5ed91e2221) --- .../client/api/SimpleProducerConsumerTest.java | 33 ++++++++++++++++++++++ .../pulsar/client/impl/crypto/MessageCryptoBc.java | 1 + 2 files changed, 34 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 2eebbf8..fc238d1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -77,6 +77,7 @@ import org.apache.avro.Schema.Parser; import org.apache.bookkeeper.common.concurrent.FutureUtils; import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -2621,6 +2622,38 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { } @Test + public void testCryptoWithChunking() throws Exception { + final String topic = "persistent://my-property/my-ns/testCryptoWithChunking" + System.currentTimeMillis(); + final String ecdsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem"; + final String ecdsaPrivateKeyFile = "file:./src/test/resources/certificate/private-key.client-ecdsa.pem"; + + this.conf.setMaxMessageSize(1000); + + @Cleanup + PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0); + + @Cleanup + Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") + .defaultCryptoKeyReader(ecdsaPrivateKeyFile).subscribe(); + @Cleanup + Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topic) + .enableChunking(true) + .enableBatching(false) + .addEncryptionKey("client-ecdsa.pem") + .defaultCryptoKeyReader(ecdsaPublicKeyFile) + .create(); + + byte[] data = RandomUtils.nextBytes(5100); + MessageId id = producer1.send(data); + log.info("Message Id={}", id); + + MessageImpl<byte[]> message; + message = (MessageImpl<byte[]>) consumer1.receive(); + Assert.assertEquals(message.getData(), data); + Assert.assertEquals(message.getEncryptionCtx().get().getKeys().size(), 1); + } + + @Test public void testDefaultCryptoKeyReader() throws Exception { final String topic = "persistent://my-property/my-ns/default-crypto-key-reader" + System.currentTimeMillis(); final String ecdsaPublicKeyFile = "file:./src/test/resources/certificate/public-key.client-ecdsa.pem"; diff --git a/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java b/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java index e4f3200..4ce4574 100644 --- a/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java +++ b/pulsar-client-messagecrypto-bc/src/main/java/org/apache/pulsar/client/impl/crypto/MessageCryptoBc.java @@ -389,6 +389,7 @@ public class MessageCryptoBc implements MessageCrypto<MessageMetadata, MessageMe return; } + msgMetadata.clearEncryptionKeys(); // Update message metadata with encrypted data key for (String keyName : encKeys) { if (encryptedDataKeyMap.get(keyName) == null) {
