This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 77f8a4f Fix peek message failure when broker entry metadata is
enabled (#10924)
77f8a4f is described below
commit 77f8a4f49d5f51bab6121a74a2c32907614cc4ff
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Jun 15 14:05:28 2021 +0800
Fix peek message failure when broker entry metadata is enabled (#10924)
### Motivation
When broker entry metadata is enabled, using pulsar-admin to peek messages
will fail with
```
[pulsar-web-29-16] ERROR
org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Failed to
peek message at position 1 from persistent://prop/ns-abc/topic-b604aad8ea8010af
my-sub
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
at
org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270)
~[classes/:?]
at
org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370)
~[classes/:?]
at
org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:426)
~[classes/:?]
```
### Modifications
Skip the broker entry metadata if exists in `generateResponseWithEntry` and
add the test to verify it.
(cherry picked from commit 7603a9d9fc3d592318aab36b4701bff4a0bd6928)
---
.../broker/admin/impl/PersistentTopicsBase.java | 1 +
.../broker/service/BrokerEntryMetadataE2ETest.java | 19 +++++++++++++++++++
2 files changed, 20 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 240bef9..8bf5a7c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2450,6 +2450,7 @@ public class PersistentTopicsBase extends AdminResource {
ByteBuf metadataAndPayload = entry.getDataBuffer();
// moves the readerIndex to the payload
+ Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload);
MessageMetadata metadata =
Commands.parseMessageMetadata(metadataAndPayload);
ResponseBuilder responseBuilder = Response.ok();
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index 1954271..5cbaf3d 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -18,9 +18,11 @@
*/
package org.apache.pulsar.broker.service;
+import java.util.List;
import lombok.Cleanup;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionType;
import org.assertj.core.util.Sets;
@@ -90,4 +92,21 @@ public class BrokerEntryMetadataE2ETest extends
BrokerTestBase {
Assert.assertEquals(messages, receives);
}
+
+ @Test(timeOut = 20000)
+ public void testPeekMessage() throws Exception {
+ final String topic = newTopicName();
+ final String subscription = "my-sub";
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(topic)
+ .create();
+ producer.newMessage().value("hello".getBytes()).send();
+
+ admin.topics().createSubscription(topic, subscription,
MessageId.earliest);
+ final List<Message<byte[]>> messages =
admin.topics().peekMessages(topic, subscription, 1);
+ Assert.assertEquals(messages.size(), 1);
+ Assert.assertEquals(messages.get(0).getData(), "hello".getBytes());
+ }
}