This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 4420dd0de81 cherry-pick -x "628e760"
4420dd0de81 is described below
commit 4420dd0de817c9be853ed0bb9feb1ce5baef6390
Author: Jiwei Guo <[email protected]>
AuthorDate: Fri Sep 16 23:56:03 2022 +0800
cherry-pick -x "628e760"
---
.../apache/pulsar/common/protocol/Commands.java | 4 +++
.../pulsar/common/compression/CommandsTest.java | 41 +++++++++++++++++++---
2 files changed, 40 insertions(+), 5 deletions(-)
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 091c9b8b7c6..934e331bd6b 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -29,6 +29,7 @@ import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -1731,6 +1732,9 @@ public class Commands {
if (metadata.hasOrderingKey()) {
return metadata.getOrderingKey();
} else if (metadata.hasPartitionKey()) {
+ if (metadata.isPartitionKeyB64Encoded()) {
+ return
Base64.getDecoder().decode(metadata.getPartitionKey());
+ }
return
metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8);
}
} catch (Throwable t) {
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
index 24d34ac547f..207c6202426 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CommandsTest.java
@@ -18,22 +18,23 @@
*/
package org.apache.pulsar.common.compression;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
-
import java.io.IOException;
-
+import java.util.Base64;
+import io.netty.util.ReferenceCountUtil;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
+import org.testng.Assert;
import org.testng.annotations.Test;
public class CommandsTest {
@@ -93,5 +94,35 @@ public class CommandsTest {
return computedChecksum;
}
-
+ @Test
+ public void testPeekStickyKey() {
+ String message = "msg-1";
+ String partitionedKey = "key1";
+ MessageMetadata messageMetadata2 = new MessageMetadata()
+ .setSequenceId(1)
+ .setProducerName("testProducer")
+ .setPartitionKey(partitionedKey)
+ .setPartitionKeyB64Encoded(false)
+ .setPublishTime(System.currentTimeMillis());
+ ByteBuf byteBuf =
serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata2,
+ Unpooled.copiedBuffer(message.getBytes(UTF_8)));
+ byte[] bytes = Commands.peekStickyKey(byteBuf, "topic-1", "sub-1");
+ String key = new String(bytes);
+ Assert.assertEquals(partitionedKey, key);
+ ReferenceCountUtil.safeRelease(byteBuf);
+ // test 64 encoded
+ String partitionedKey2 =
Base64.getEncoder().encodeToString("key2".getBytes(UTF_8));
+ MessageMetadata messageMetadata = new MessageMetadata()
+ .setSequenceId(1)
+ .setProducerName("testProducer")
+ .setPartitionKey(partitionedKey2)
+ .setPartitionKeyB64Encoded(true)
+ .setPublishTime(System.currentTimeMillis());
+ ByteBuf byteBuf2 =
serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata,
+ Unpooled.copiedBuffer(message.getBytes(UTF_8)));
+ byte[] bytes2 = Commands.peekStickyKey(byteBuf2, "topic-2", "sub-2");
+ String key2 = Base64.getEncoder().encodeToString(bytes2);;
+ Assert.assertEquals(partitionedKey2, key2);
+ ReferenceCountUtil.safeRelease(byteBuf2);
+ }
}