This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9c3e904ae56b660ca7b0f7b201effcc884654fd5 Author: Aloys <[email protected]> AuthorDate: Sat Jun 19 02:00:48 2021 +0800 fix parseMessageMetadata error cause by not skip broker entry metadata (#10968) Fixes #10967 ### Motivation fix parseMessageMetadata error cause by not skip broker entry metadata ### Modifications skip broker entry metadata if exist before parsing message metadata (cherry picked from commit 0774b5fddddc0c9fe9b7cc00ae40e43322690ef1) --- .../broker/admin/impl/PersistentTopicsBase.java | 2 - .../broker/service/BrokerEntryMetadataE2ETest.java | 20 +++++++++ .../org/apache/pulsar/client/impl/MessageImpl.java | 2 - .../apache/pulsar/client/impl/MessageImplTest.java | 51 ++++++++++++++++++++++ .../apache/pulsar/common/protocol/Commands.java | 5 ++- 5 files changed, 74 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 4a03cd1..76211b8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2451,8 +2451,6 @@ public class PersistentTopicsBase extends AdminResource { PositionImpl pos = (PositionImpl) entry.getPosition(); ByteBuf metadataAndPayload = entry.getDataBuffer(); - // moves the readerIndex to the payload - Commands.skipBrokerEntryMetadataIfExist(metadataAndPayload); MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); ResponseBuilder responseBuilder = Response.ok(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java index 5cbaf3d..e7d98a8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java @@ -109,4 +109,24 @@ public class BrokerEntryMetadataE2ETest extends BrokerTestBase { Assert.assertEquals(messages.size(), 1); Assert.assertEquals(messages.get(0).getData(), "hello".getBytes()); } + + @Test(timeOut = 20000) + public void testGetLastMessageId() throws Exception { + final String topic = "persistent://prop/ns-abc/topic-test"; + final String subscription = "my-sub"; + + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topic) + .create(); + producer.newMessage().value("hello".getBytes()).send(); + + @Cleanup + Consumer<byte[]> consumer = pulsarClient.newConsumer() + .topic(topic) + .subscriptionType(SubscriptionType.Exclusive) + .subscriptionName(subscription) + .subscribe(); + consumer.getLastMessageId(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index efb66e5..c9370f3 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -300,8 +300,6 @@ public class MessageImpl<T> implements Message<T> { @SuppressWarnings("unchecked") MessageImpl<byte[]> msg = (MessageImpl<byte[]>) RECYCLER.get(); - Commands.skipBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata); - Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata, msg.msgMetadata); msg.payload = headersAndPayloadWithBrokerEntryMetadata; msg.messageId = null; diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java index 0a57d93..17b77a2 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java @@ -483,4 +483,55 @@ public class MessageImplTest { fail(); } } + + @Test(timeOut = 30000) + public void testParseMessageMetadataWithBrokerEntryMetadata() { + int MOCK_BATCH_SIZE = 10; + String data = "test-message"; + ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length()); + byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8)); + + // first, build a message with broker entry metadata + + // build message metadata + MessageMetadata messageMetadata = new MessageMetadata() + .setPublishTime(1) + .setProducerName("test") + .setSequenceId(1); + byteBuf = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, byteBuf); + + // build broker entry metadata + BrokerEntryMetadata brokerMetadata = new BrokerEntryMetadata() + .setIndex(MOCK_BATCH_SIZE - 1); + + // build final data which contains broker entry metadata + int brokerMetaSize = brokerMetadata.getSerializedSize(); + ByteBuf brokerMeta = PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6); + brokerMeta.writeShort(Commands.magicBrokerEntryMetadata); + brokerMeta.writeInt(brokerMetaSize); + brokerMetadata.writeTo(brokerMeta); + + CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); + compositeByteBuf.addComponents(true, brokerMeta, byteBuf); + + CompositeByteBuf dupCompositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); + dupCompositeByteBuf.addComponents(true, brokerMeta, byteBuf); + + //second, parse message metadata without skip broker entry metadata + Commands.skipChecksumIfPresent(compositeByteBuf); + int metadataSize = (int) compositeByteBuf.readUnsignedInt(); + MessageMetadata md = new MessageMetadata(); + try { + md.parseFrom(compositeByteBuf, metadataSize); + Assert.fail("Parse operation should be failed."); + } catch (IllegalArgumentException e) { + // expected + } + + //third, parse message metadata with skip broker entry metadata first + MessageMetadata metadata = Commands.parseMessageMetadata(dupCompositeByteBuf); + assertEquals(metadata.getPublishTime(), 1); + assertEquals(metadata.getProducerName(), "test"); + assertEquals(metadata.getSequenceId(), 1); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 0c80395..7c93ecc 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -418,6 +418,9 @@ public class Commands { } public static void parseMessageMetadata(ByteBuf buffer, MessageMetadata msgMetadata) { + // initially reader-index may point to start of broker entry metadata : + // increment reader-index to start_of_headAndPayload to parse metadata + skipBrokerEntryMetadataIfExist(buffer); // initially reader-index may point to start_of_checksum : increment reader-index to start_of_metadata // to parse metadata skipChecksumIfPresent(buffer); @@ -1667,7 +1670,6 @@ public class Commands { try { // save the reader index and restore after parsing int readerIdx = metadataAndPayload.readerIndex(); - skipBrokerEntryMetadataIfExist(metadataAndPayload); MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); metadataAndPayload.readerIndex(readerIdx); @@ -1682,7 +1684,6 @@ public class Commands { public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) { try { int readerIdx = metadataAndPayload.readerIndex(); - skipBrokerEntryMetadataIfExist(metadataAndPayload); MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); metadataAndPayload.readerIndex(readerIdx); if (metadata.hasOrderingKey()) {
