This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 60161ed  Fix peek message metadata broker while enable broker entry 
metadata. (#9255)
60161ed is described below

commit 60161ede9da80e6e1840b5fad0c1d0d11f43ccaa
Author: lipenghui <[email protected]>
AuthorDate: Mon Jan 25 08:16:21 2021 +0800

    Fix peek message metadata broker while enable broker entry metadata. (#9255)
    
    ### Motivation
    
    Fix peek message metadata broker while enable broker entry metadata.
    
    When enabled the broker entry metadata, following error occurs:
    
    ```
    22:09:57.802 
[broker-topic-workers-OrderedScheduler-4-0:org.apache.pulsar.common.protocol.Commands@1658]
 ERROR org.apache.pulsar.common.protocol.Commands - 
[PersistentSubscription{topic=persistent://public/default/__consumer_offsets-partition-0,
 name=reader-31a9742e6c}] [-1] Failed to parse message metadata
    java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
        at 
org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270)
 ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
        at 
org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370)
 ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
        at 
org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:425)
 ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
        at 
org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:415)
 ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
        at 
org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1653)
 ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
        at 
org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:82)
 ~[pulsar-broker-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
        at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.internalReadEntriesComplete(PersistentDispatcherSingleActiveConsumer.java:232)
 ~[pulsar-broker-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
        at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$readEntriesComplete$1(PersistentDispatcherSingleActiveConsumer.java:178)
 ~[pulsar-broker-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
        at 
org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) 
[managed-ledger-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
        at 
org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) 
[bookkeeper-common-4.12.1.jar:4.12.1]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_261]
        at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) 
[?:1.8.0_261]
        at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_261]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_261]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 [?:1.8.0_261]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_261]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_261]
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 [netty-common-4.1.51.Final.jar:4.1.51.Final]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
    ```
    
    The root cause is peeking message metadata does not skip the broker entry 
metadata.
---
 .../broker/service/AbstractBaseDispatcher.java     | 13 +--
 .../broker/service/BrokerEntryMetadataE2ETest.java | 92 ++++++++++++++++++++++
 .../client/api/KeySharedSubscriptionTest.java      |  3 +-
 .../apache/pulsar/common/protocol/Commands.java    | 21 ++++-
 .../src/main/resources/findbugsExclude.xml         |  3 +
 5 files changed, 118 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index ec85048..02305e4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -165,19 +165,8 @@ public abstract class AbstractBaseDispatcher implements 
Dispatcher {
         // noop
     }
 
-    public static final String NONE_KEY = "NONE_KEY";
-
     protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
-        metadataAndPayload.markReaderIndex();
-        MessageMetadata metadata = 
Commands.parseMessageMetadata(metadataAndPayload);
-        metadataAndPayload.resetReaderIndex();
-        byte[] key = NONE_KEY.getBytes();
-        if (metadata.hasOrderingKey()) {
-            return metadata.getOrderingKey();
-        } else if (metadata.hasPartitionKey()) {
-            return metadata.getPartitionKey().getBytes();
-        }
-        return key;
+        return Commands.peekStickyKey(metadataAndPayload, 
subscription.getTopicName(), subscription.getName());
     }
 
     protected void addMessageToReplay(long ledgerId, long entryId) {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
new file mode 100644
index 0000000..7c1ca28
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.assertj.core.util.Sets;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test for the broker entry metadata.
+ */
+public class BrokerEntryMetadataE2ETest extends BrokerTestBase {
+
+    @DataProvider(name = "subscriptionTypes")
+    public static Object[] subscriptionTypes() {
+        return new Object[] {
+                SubscriptionType.Exclusive,
+                SubscriptionType.Failover,
+                SubscriptionType.Shared,
+                SubscriptionType.Key_Shared
+        };
+    }
+
+    @BeforeClass
+    protected void setup() throws Exception {
+        conf.setBrokerEntryMetadataInterceptors(Sets.newTreeSet(
+                
"org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor",
+                
"org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor"
+                ));
+        baseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    protected void cleanup() throws Exception {
+        internalCleanup();
+    }
+
+    @Test(dataProvider = "subscriptionTypes")
+    public void testProduceAndConsume(SubscriptionType subType) throws 
Exception {
+        final String topic = newTopicName();
+        final int messages = 10;
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(topic)
+                .subscriptionType(subType)
+                .subscriptionName("my-sub")
+                .subscribe();
+
+        for (int i = 0; i < messages; i++) {
+            producer.send(String.valueOf(i).getBytes());
+        }
+
+        int receives = 0;
+        for (int i = 0; i < messages; i++) {
+            Message<byte[]> received = consumer.receive();
+            ++ receives;
+            Assert.assertEquals(i, Integer.valueOf(new 
String(received.getValue())).intValue());
+        }
+
+        Assert.assertEquals(messages, receives);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
index 50011d4..938ffed 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java
@@ -44,6 +44,7 @@ import 
org.apache.curator.shaded.com.google.common.collect.Lists;
 import org.apache.pulsar.broker.service.Topic;
 import 
org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.slf4j.Logger;
@@ -306,7 +307,7 @@ public class KeySharedSubscriptionTest extends 
ProducerConsumerBase {
                     .value(i)
                     .send();
         }
-        int slot = 
Murmur3_32Hash.getInstance().makeHash(PersistentStickyKeyDispatcherMultipleConsumers.NONE_KEY.getBytes())
+        int slot = Murmur3_32Hash.getInstance().makeHash("NONE_KEY".getBytes())
                 % KeySharedPolicy.DEFAULT_HASH_RANGE_SIZE;
         List<KeyValue<Consumer<Integer>, Integer>> checkList = new 
ArrayList<>();
         if (slot <= 20000) {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index a29ceb1..55497fc 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -28,12 +28,12 @@ import io.netty.buffer.Unpooled;
 import io.netty.util.concurrent.FastThreadLocal;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.stream.Collectors;
 import lombok.experimental.UtilityClass;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -1654,6 +1654,7 @@ public class Commands {
         try {
             // save the reader index and restore after parsing
             int readerIdx = metadataAndPayload.readerIndex();
+            skipBrokerEntryMetadataIfExist(metadataAndPayload);
             MessageMetadata metadata = 
Commands.parseMessageMetadata(metadataAndPayload);
             metadataAndPayload.readerIndex(readerIdx);
 
@@ -1664,6 +1665,24 @@ public class Commands {
         }
     }
 
+    private static final byte[] NONE_KEY = 
"NONE_KEY".getBytes(StandardCharsets.UTF_8);
+    public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String 
topic, String subscription) {
+        try {
+            int readerIdx = metadataAndPayload.readerIndex();
+            skipBrokerEntryMetadataIfExist(metadataAndPayload);
+            MessageMetadata metadata = 
Commands.parseMessageMetadata(metadataAndPayload);
+            metadataAndPayload.readerIndex(readerIdx);
+            if (metadata.hasOrderingKey()) {
+                return metadata.getOrderingKey();
+            } else if (metadata.hasPartitionKey()) {
+                return 
metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8);
+            }
+        } catch (Throwable t) {
+            log.error("[{}] [{}] Failed to peek sticky key from the message 
metadata", topic, subscription, t);
+        }
+        return Commands.NONE_KEY;
+    }
+
     public static int getCurrentProtocolVersion() {
         // Return the last ProtocolVersion enum value
         return ProtocolVersion.values()[ProtocolVersion.values().length - 
1].getValue();
diff --git a/pulsar-common/src/main/resources/findbugsExclude.xml 
b/pulsar-common/src/main/resources/findbugsExclude.xml
index 1706bca..df161c4 100644
--- a/pulsar-common/src/main/resources/findbugsExclude.xml
+++ b/pulsar-common/src/main/resources/findbugsExclude.xml
@@ -44,6 +44,9 @@
         <Bug pattern="EI_EXPOSE_REP2"/>
     </Match>
     <Match>
+        <Bug pattern="MS_EXPOSE_REP"/>
+    </Match>
+    <Match>
         <Bug pattern="UUF_UNUSED_PUBLIC_OR_PROTECTED_FIELD"/>
     </Match>
     <Match>

Reply via email to