This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ea3c064fb2f78ee412b8708b966048ac46f681de Author: JiangHaiting <[email protected]> AuthorDate: Tue Jan 18 11:51:56 2022 +0800 fix ordering key with chunking (#13699) Master Issue: #13688 ### Motivation The root cause is the same as #13688. Chunking messages share the same metadata. And the type of ordering key is bytebuf, which can not be serialized twice. ### Modifications Reset the field before sending. (cherry picked from commit 8dc29755b8448d2a94f6e6af60b7d56ef591e05f) --- .../pulsar/client/impl/MessageChunkingTest.java | 25 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ProducerImpl.java | 11 ++++++++-- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java index 40191ef..fc935a2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java @@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.commons.lang3.RandomUtils; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.CompressionType; @@ -62,6 +63,7 @@ import org.apache.pulsar.common.protocol.Commands.ChecksumType; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; @@ -167,6 +169,29 @@ public class MessageChunkingTest extends ProducerConsumerBase { } + @Test + public void testChunkingWithOrderingKey() throws Exception { + this.conf.setMaxMessageSize(5); + + final String topicName = "persistent://my-property/my-ns/testChunkingWithOrderingKey"; + + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name") + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableChunking(true) + .enableBatching(false).create(); + + byte[] data = RandomUtils.nextBytes(20); + byte[] ok = RandomUtils.nextBytes(10); + producer.newMessage().value(data).orderingKey(ok).send(); + + Message<byte[]> msg = consumer.receive(); + Assert.assertEquals(msg.getData(), data); + Assert.assertEquals(msg.getOrderingKey(), ok); + } + @Test(dataProvider = "ackReceiptEnabled") public void testLargeMessageAckTimeOut(boolean ackReceiptEnabled) throws Exception { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 46649bd..95adbcc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -490,13 +490,20 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne String uuid = totalChunks > 1 ? String.format("%s-%d", producerName, sequenceId) : null; byte[] schemaVersion = totalChunks > 1 && msg.getMessageBuilder().hasSchemaVersion() ? msg.getMessageBuilder().getSchemaVersion() : null; + byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey() ? + msg.getMessageBuilder().getOrderingKey() : null; for (int chunkId = 0; chunkId < totalChunks; chunkId++) { // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`, // we need to reset the ByteBuf of the schemaVersion in `MessageMetadata`, I think we need to // reset `ByteBuf` objects in `MessageMetadata` after call the method `MessageMetadata#writeTo()`. - if (chunkId > 0 && schemaVersion != null) { - msg.getMessageBuilder().setSchemaVersion(schemaVersion); + if (chunkId > 0) { + if (schemaVersion != null) { + msg.getMessageBuilder().setSchemaVersion(schemaVersion); + } + if (orderingKey != null) { + msg.getMessageBuilder().setOrderingKey(orderingKey); + } } serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks, readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed,
