This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e802b586eb6c7cf8e2a6842aff2a5def5f70749a
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Sep 16 23:56:03 2022 +0800

    [fix][common] Fix parsing partitionedKey with Base64 encode issue. (#17687)
    
    * Fix parsing partitionedKey with Base64 encode issue.
    
    * release the buf
    
    * fix checkstyle issue.
---
 .../apache/pulsar/common/protocol/Commands.java    |  4 +++
 .../pulsar/common/compression/CommandsTest.java    | 41 +++++++++++++++++++---
 2 files changed, 40 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 62243063741..61ce70a9a0a 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -30,6 +30,7 @@ import io.netty.util.concurrent.FastThreadLocal;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -1888,6 +1889,9 @@ public class Commands {
             if (metadata.hasOrderingKey()) {
                 return metadata.getOrderingKey();
             } else if (metadata.hasPartitionKey()) {
+                if (metadata.isPartitionKeyB64Encoded()) {
+                    return 
Base64.getDecoder().decode(metadata.getPartitionKey());
+                }
                 return 
metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8);
             }
         } catch (Throwable t) {
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
index 24d34ac547f..207c6202426 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
@@ -18,22 +18,23 @@
  */
 package org.apache.pulsar.common.compression;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
-
 import com.scurrilous.circe.checksum.Crc32cIntChecksum;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
-
 import java.io.IOException;
-
+import java.util.Base64;
+import io.netty.util.ReferenceCountUtil;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Commands.ChecksumType;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class CommandsTest {
@@ -93,5 +94,35 @@ public class CommandsTest {
         return computedChecksum;
     }
 
-
+    @Test
+    public void testPeekStickyKey() {
+        String message = "msg-1";
+        String partitionedKey = "key1";
+        MessageMetadata messageMetadata2 = new MessageMetadata()
+                .setSequenceId(1)
+                .setProducerName("testProducer")
+                .setPartitionKey(partitionedKey)
+                .setPartitionKeyB64Encoded(false)
+                .setPublishTime(System.currentTimeMillis());
+        ByteBuf byteBuf = 
serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata2,
+                Unpooled.copiedBuffer(message.getBytes(UTF_8)));
+        byte[] bytes = Commands.peekStickyKey(byteBuf, "topic-1", "sub-1");
+        String key = new String(bytes);
+        Assert.assertEquals(partitionedKey, key);
+        ReferenceCountUtil.safeRelease(byteBuf);
+        // test 64 encoded
+        String partitionedKey2 = 
Base64.getEncoder().encodeToString("key2".getBytes(UTF_8));
+        MessageMetadata messageMetadata = new MessageMetadata()
+                .setSequenceId(1)
+                .setProducerName("testProducer")
+                .setPartitionKey(partitionedKey2)
+                .setPartitionKeyB64Encoded(true)
+                .setPublishTime(System.currentTimeMillis());
+        ByteBuf byteBuf2 = 
serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata,
+                Unpooled.copiedBuffer(message.getBytes(UTF_8)));
+        byte[] bytes2 = Commands.peekStickyKey(byteBuf2, "topic-2", "sub-2");
+        String key2 = Base64.getEncoder().encodeToString(bytes2);;
+        Assert.assertEquals(partitionedKey2, key2);
+        ReferenceCountUtil.safeRelease(byteBuf2);
+    }
 }

Reply via email to