This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e802b586eb6c7cf8e2a6842aff2a5def5f70749a Author: Jiwei Guo <[email protected]> AuthorDate: Fri Sep 16 23:56:03 2022 +0800 [fix][common] Fix parsing partitionedKey with Base64 encode issue. (#17687) * Fix parsing partitionedKey with Base64 encode issue. * release the buf * fix checkstyle issue. --- .../apache/pulsar/common/protocol/Commands.java | 4 +++ .../pulsar/common/compression/CommandsTest.java | 41 +++++++++++++++++++--- 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 62243063741..61ce70a9a0a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -30,6 +30,7 @@ import io.netty.util.concurrent.FastThreadLocal; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Base64; import java.util.Collections; import java.util.List; import java.util.Map; @@ -1888,6 +1889,9 @@ public class Commands { if (metadata.hasOrderingKey()) { return metadata.getOrderingKey(); } else if (metadata.hasPartitionKey()) { + if (metadata.isPartitionKeyB64Encoded()) { + return Base64.getDecoder().decode(metadata.getPartitionKey()); + } return metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8); } } catch (Throwable t) { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java index 24d34ac547f..207c6202426 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java @@ -18,22 +18,23 @@ */ package org.apache.pulsar.common.compression; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; - import com.scurrilous.circe.checksum.Crc32cIntChecksum; - import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; - import java.io.IOException; - +import java.util.Base64; +import io.netty.util.ReferenceCountUtil; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Commands.ChecksumType; +import org.testng.Assert; import org.testng.annotations.Test; public class CommandsTest { @@ -93,5 +94,35 @@ public class CommandsTest { return computedChecksum; } - + @Test + public void testPeekStickyKey() { + String message = "msg-1"; + String partitionedKey = "key1"; + MessageMetadata messageMetadata2 = new MessageMetadata() + .setSequenceId(1) + .setProducerName("testProducer") + .setPartitionKey(partitionedKey) + .setPartitionKeyB64Encoded(false) + .setPublishTime(System.currentTimeMillis()); + ByteBuf byteBuf = serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata2, + Unpooled.copiedBuffer(message.getBytes(UTF_8))); + byte[] bytes = Commands.peekStickyKey(byteBuf, "topic-1", "sub-1"); + String key = new String(bytes); + Assert.assertEquals(partitionedKey, key); + ReferenceCountUtil.safeRelease(byteBuf); + // test 64 encoded + String partitionedKey2 = Base64.getEncoder().encodeToString("key2".getBytes(UTF_8)); + MessageMetadata messageMetadata = new MessageMetadata() + .setSequenceId(1) + .setProducerName("testProducer") + .setPartitionKey(partitionedKey2) + .setPartitionKeyB64Encoded(true) + .setPublishTime(System.currentTimeMillis()); + ByteBuf byteBuf2 = serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, + Unpooled.copiedBuffer(message.getBytes(UTF_8))); + byte[] bytes2 = Commands.peekStickyKey(byteBuf2, "topic-2", "sub-2"); + String key2 = Base64.getEncoder().encodeToString(bytes2);; + Assert.assertEquals(partitionedKey2, key2); + ReferenceCountUtil.safeRelease(byteBuf2); + } }
