This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 60161ed Fix peek message metadata broker while enable broker entry
metadata. (#9255)
60161ed is described below
commit 60161ede9da80e6e1840b5fad0c1d0d11f43ccaa
Author: lipenghui <[email protected]>
AuthorDate: Mon Jan 25 08:16:21 2021 +0800
Fix peek message metadata broker while enable broker entry metadata. (#9255)
### Motivation
Fix peek message metadata broker while enable broker entry metadata.
When enabled the broker entry metadata, following error occurs:
```
22:09:57.802
[broker-topic-workers-OrderedScheduler-4-0:org.apache.pulsar.common.protocol.Commands@1658]
ERROR org.apache.pulsar.common.protocol.Commands -
[PersistentSubscription{topic=persistent://public/default/__consumer_offsets-partition-0,
name=reader-31a9742e6c}] [-1] Failed to parse message metadata
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
at
org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270)
~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
at
org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370)
~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
at
org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:425)
~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
at
org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:415)
~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
at
org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1653)
~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
at
org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:82)
~[pulsar-broker-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
at
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.internalReadEntriesComplete(PersistentDispatcherSingleActiveConsumer.java:232)
~[pulsar-broker-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
at
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$readEntriesComplete$1(PersistentDispatcherSingleActiveConsumer.java:178)
~[pulsar-broker-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
at
org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32)
[managed-ledger-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
at
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36)
[bookkeeper-common-4.12.1.jar:4.12.1]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[?:1.8.0_261]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
[?:1.8.0_261]
at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_261]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
[?:1.8.0_261]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
[?:1.8.0_261]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[?:1.8.0_261]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[?:1.8.0_261]
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[netty-common-4.1.51.Final.jar:4.1.51.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
```
The root cause is peeking message metadata does not skip the broker entry
metadata.
---
.../broker/service/AbstractBaseDispatcher.java | 13 +--
.../broker/service/BrokerEntryMetadataE2ETest.java | 92 ++++++++++++++++++++++
.../client/api/KeySharedSubscriptionTest.java | 3 +-
.../apache/pulsar/common/protocol/Commands.java | 21 ++++-
.../src/main/resources/findbugsExclude.xml | 3 +
5 files changed, 118 insertions(+), 14 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index ec85048..02305e4 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -165,19 +165,8 @@ public abstract class AbstractBaseDispatcher implements
Dispatcher {
// noop
}
- public static final String NONE_KEY = "NONE_KEY";
-
protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
- metadataAndPayload.markReaderIndex();
- MessageMetadata metadata =
Commands.parseMessageMetadata(metadataAndPayload);
- metadataAndPayload.resetReaderIndex();
- byte[] key = NONE_KEY.getBytes();
- if (metadata.hasOrderingKey()) {
- return metadata.getOrderingKey();
- } else if (metadata.hasPartitionKey()) {
- return metadata.getPartitionKey().getBytes();
- }
- return key;
+ return Commands.peekStickyKey(metadataAndPayload,
subscription.getTopicName(), subscription.getName());
}
protected void addMessageToReplay(long ledgerId, long entryId) {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
new file mode 100644
index 0000000..7c1ca28
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.assertj.core.util.Sets;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test for the broker entry metadata.
+ */
+public class BrokerEntryMetadataE2ETest extends BrokerTestBase {
+
+ @DataProvider(name = "subscriptionTypes")
+ public static Object[] subscriptionTypes() {
+ return new Object[] {
+ SubscriptionType.Exclusive,
+ SubscriptionType.Failover,
+ SubscriptionType.Shared,
+ SubscriptionType.Key_Shared
+ };
+ }
+
+ @BeforeClass
+ protected void setup() throws Exception {
+ conf.setBrokerEntryMetadataInterceptors(Sets.newTreeSet(
+
"org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor",
+
"org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor"
+ ));
+ baseSetup();
+ }
+
+ @AfterClass(alwaysRun = true)
+ protected void cleanup() throws Exception {
+ internalCleanup();
+ }
+
+ @Test(dataProvider = "subscriptionTypes")
+ public void testProduceAndConsume(SubscriptionType subType) throws
Exception {
+ final String topic = newTopicName();
+ final int messages = 10;
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .create();
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionType(subType)
+ .subscriptionName("my-sub")
+ .subscribe();
+
+ for (int i = 0; i < messages; i++) {
+ producer.send(String.valueOf(i).getBytes());
+ }
+
+ int receives = 0;
+ for (int i = 0; i < messages; i++) {
+ Message<byte[]> received = consumer.receive();
+ ++ receives;
+ Assert.assertEquals(i, Integer.valueOf(new
String(received.getValue())).intValue());
+ }
+
+ Assert.assertEquals(messages, receives);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 50011d4..938ffed 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -44,6 +44,7 @@ import
org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.pulsar.broker.service.Topic;
import
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.slf4j.Logger;
@@ -306,7 +307,7 @@ public class KeySharedSubscriptionTest extends
ProducerConsumerBase {
.value(i)
.send();
}
- int slot =
Murmur3_32Hash.getInstance().makeHash(PersistentStickyKeyDispatcherMultipleConsumers.NONE_KEY.getBytes())
+ int slot = Murmur3_32Hash.getInstance().makeHash("NONE_KEY".getBytes())
% KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;
List<KeyValue<Consumer<Integer>, Integer>> checkList = new
ArrayList<>();
if (slot <= 20000) {
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index a29ceb1..55497fc 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -28,12 +28,12 @@ import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.stream.Collectors;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -1654,6 +1654,7 @@ public class Commands {
try {
// save the reader index and restore after parsing
int readerIdx = metadataAndPayload.readerIndex();
+ skipBrokerEntryMetadataIfExist(metadataAndPayload);
MessageMetadata metadata =
Commands.parseMessageMetadata(metadataAndPayload);
metadataAndPayload.readerIndex(readerIdx);
@@ -1664,6 +1665,24 @@ public class Commands {
}
}
+ private static final byte[] NONE_KEY =
"NONE_KEY".getBytes(StandardCharsets.UTF_8);
+ public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String
topic, String subscription) {
+ try {
+ int readerIdx = metadataAndPayload.readerIndex();
+ skipBrokerEntryMetadataIfExist(metadataAndPayload);
+ MessageMetadata metadata =
Commands.parseMessageMetadata(metadataAndPayload);
+ metadataAndPayload.readerIndex(readerIdx);
+ if (metadata.hasOrderingKey()) {
+ return metadata.getOrderingKey();
+ } else if (metadata.hasPartitionKey()) {
+ return
metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8);
+ }
+ } catch (Throwable t) {
+ log.error("[{}] [{}] Failed to peek sticky key from the message
metadata", topic, subscription, t);
+ }
+ return Commands.NONE_KEY;
+ }
+
public static int getCurrentProtocolVersion() {
// Return the last ProtocolVersion enum value
return ProtocolVersion.values()[ProtocolVersion.values().length -
1].getValue();
diff --git a/pulsar-common/src/main/resources/findbugsExclude.xml
b/pulsar-common/src/main/resources/findbugsExclude.xml
index 1706bca..df161c4 100644
--- a/pulsar-common/src/main/resources/findbugsExclude.xml
+++ b/pulsar-common/src/main/resources/findbugsExclude.xml
@@ -44,6 +44,9 @@
<Bug pattern="EI_EXPOSE_REP2"/>
</Match>
<Match>
+ <Bug pattern="MS_EXPOSE_REP"/>
+ </Match>
+ <Match>
<Bug pattern="UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD"/>
</Match>
<Match>