This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ea3c064fb2f78ee412b8708b966048ac46f681de
Author: JiangHaiting <[email protected]>
AuthorDate: Tue Jan 18 11:51:56 2022 +0800

    fix ordering key with chunking (#13699)
    
    Master Issue: #13688
    
    ### Motivation
    
    The root cause is the same as #13688. Chunking messages share the same 
metadata. And the type of ordering key is bytebuf, which can not be serialized 
twice.
    
    ### Modifications
    
    Reset the field before sending.
    
    (cherry picked from commit 8dc29755b8448d2a94f6e6af60b7d56ef591e05f)
---
 .../pulsar/client/impl/MessageChunkingTest.java    | 25 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 11 ++++++++--
 2 files changed, 34 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
index 40191ef..fc935a2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java
@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.CompressionType;
@@ -62,6 +63,7 @@ import 
org.apache.pulsar.common.protocol.Commands.ChecksumType;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
@@ -167,6 +169,29 @@ public class MessageChunkingTest extends 
ProducerConsumerBase {
 
     }
 
+    @Test
+    public void testChunkingWithOrderingKey() throws Exception {
+        this.conf.setMaxMessageSize(5);
+
+        final String topicName = 
"persistent://my-property/my-ns/testChunkingWithOrderingKey";
+
+        @Cleanup
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name")
+                .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).enableChunking(true)
+                .enableBatching(false).create();
+
+        byte[] data = RandomUtils.nextBytes(20);
+        byte[] ok = RandomUtils.nextBytes(10);
+        producer.newMessage().value(data).orderingKey(ok).send();
+
+        Message<byte[]> msg = consumer.receive();
+        Assert.assertEquals(msg.getData(), data);
+        Assert.assertEquals(msg.getOrderingKey(), ok);
+    }
+
     @Test(dataProvider = "ackReceiptEnabled")
     public void testLargeMessageAckTimeOut(boolean ackReceiptEnabled) throws 
Exception {
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 46649bd..95adbcc 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -490,13 +490,20 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 String uuid = totalChunks > 1 ? String.format("%s-%d", 
producerName, sequenceId) : null;
                 byte[] schemaVersion = totalChunks > 1 && 
msg.getMessageBuilder().hasSchemaVersion() ?
                         msg.getMessageBuilder().getSchemaVersion() : null;
+                byte[] orderingKey = totalChunks > 1 && 
msg.getMessageBuilder().hasOrderingKey() ?
+                        msg.getMessageBuilder().getOrderingKey() : null;
                 for (int chunkId = 0; chunkId < totalChunks; chunkId++) {
                     // Need to reset the schemaVersion, because the 
schemaVersion is based on a ByteBuf object in
                     // `MessageMetadata`, if we want to re-serialize the 
`SEND` command using a same `MessageMetadata`,
                     // we need to reset the ByteBuf of the schemaVersion in 
`MessageMetadata`, I think we need to
                     // reset `ByteBuf` objects in `MessageMetadata` after call 
the method `MessageMetadata#writeTo()`.
-                    if (chunkId > 0 && schemaVersion != null) {
-                        
msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+                    if (chunkId > 0) {
+                        if (schemaVersion != null) {
+                            
msg.getMessageBuilder().setSchemaVersion(schemaVersion);
+                        }
+                        if (orderingKey != null) {
+                            
msg.getMessageBuilder().setOrderingKey(orderingKey);
+                        }
                     }
                     serializeAndSendMessage(msg, payload, sequenceId, uuid, 
chunkId, totalChunks,
                             readStartIndex, ClientCnx.getMaxMessageSize(), 
compressedPayload, compressed,

Reply via email to