This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 77f8a4f  Fix peek message failure when broker entry metadata is 
enabled (#10924)
77f8a4f is described below

commit 77f8a4f49d5f51bab6121a74a2c32907614cc4ff
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Jun 15 14:05:28 2021 +0800

    Fix peek message failure when broker entry metadata is enabled (#10924)
    
    ### Motivation
    
    When broker entry metadata is enabled, using pulsar-admin to peek messages 
will fail with
    
    ```
    [pulsar-web-29-16] ERROR 
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Failed to 
peek message at position 1 from persistent://prop/ns-abc/topic-b604aad8ea8010af 
my-sub
    java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
        at 
org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270)
 ~[classes/:?]
        at 
org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370)
 ~[classes/:?]
        at 
org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:426)
 ~[classes/:?]
    ```
    
    ### Modifications
    
    Skip the broker entry metadata if exists in `generateResponseWithEntry` and 
add the test to verify it.
    
    (cherry picked from commit 7603a9d9fc3d592318aab36b4701bff4a0bd6928)
---
 .../broker/admin/impl/PersistentTopicsBase.java       |  1 +
 .../broker/service/BrokerEntryMetadataE2ETest.java    | 19 +++++++++++++++++++
 2 files changed, 20 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 240bef9..8bf5a7c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2450,6 +2450,7 @@ public class PersistentTopicsBase extends AdminResource {
         ByteBuf metadataAndPayload = entry.getDataBuffer();
 
         // moves the readerIndex to the payload
+        Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
         MessageMetadata metadata = 
Commands.parseMessageMetadata(metadataAndPayload);
 
         ResponseBuilder responseBuilder = Response.ok();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index 1954271..5cbaf3d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -18,9 +18,11 @@
  */
 package org.apache.pulsar.broker.service;
 
+import java.util.List;
 import lombok.Cleanup;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.assertj.core.util.Sets;
@@ -90,4 +92,21 @@ public class BrokerEntryMetadataE2ETest extends 
BrokerTestBase {
 
         Assert.assertEquals(messages, receives);
     }
+
+    @Test(timeOut = 20000)
+    public void testPeekMessage() throws Exception {
+        final String topic = newTopicName();
+        final String subscription = "my-sub";
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+        producer.newMessage().value("hello".getBytes()).send();
+
+        admin.topics().createSubscription(topic, subscription, 
MessageId.earliest);
+        final List<Message<byte[]>> messages = 
admin.topics().peekMessages(topic, subscription, 1);
+        Assert.assertEquals(messages.size(), 1);
+        Assert.assertEquals(messages.get(0).getData(), "hello".getBytes());
+    }
 }

Reply via email to