This is an automated email from the ASF dual-hosted git repository. zhaocong pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 6b78764516ea0d79ab0820e35733b06565000329 Author: fengyubiao <[email protected]> AuthorDate: Tue Dec 20 10:37:27 2022 +0800 [fix] [broker] getLastMessageId returns a wrong batch index of last message if enabled read compacted (#18877) ### Motivation The method `consumer.getLastMessageId` will return the latest message which can be received. - If disabled `read compacted`, will return the last confirmed position of `ManagedLedger`. - If enabled `read compacted`, will return the latest message id which can be read from the compacted topic. If we send a batch message like this: ```java producer.newMessage().key("k1").value("v0").sendAsync(); // message-id is [3:1,-1:0] producer.newMessage().key("k1").value("v1").sendAsync(); // message-id is [3:1,-1:1] producer.newMessage().key("k1").value("v2").sendAsync(); // message-id is [3:1,-1:2] producer.newMessage().key("k2").value("v0").sendAsync(); // message-id is [3:1,-1:3] producer.newMessage().key("k2").value("v1").sendAsync(); // message-id is [3:1,-1:4] producer.newMessage().key("k2").value(null).sendAsync(); // message-id is [3:1,-1:5] producer.flush(); ``` After the compaction task is done, the messages with key `k2` will be deleted by the compaction task. Then the latest message that can be received will be `[3:1:-1:2]`. --- When we call `consumer.getLastMessageId`, the expected result is: ``` [3:1,-1:2] ``` --- But the actual result is: ``` [3:1,-1:5] ``` ### Modifications If enabled `read compacted` and the latest entry of the compacted topic is a batched message, extract the entry and calculate all internal messages, then return the latest message which is not marked `compacted out`. (cherry picked from commit 83993ae91fa0bb845fe84a8ead15f6d9aa26069f) --- .../apache/pulsar/broker/service/ServerCnx.java | 35 +- .../compaction/GetLastMessageIdCompactedTest.java | 375 +++++++++++++++++++++ 2 files changed, 407 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 836bed191cc..9b1273c4bc7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -38,6 +38,7 @@ import io.netty.util.concurrent.FastThreadLocal; import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.ScheduledFuture; import io.prometheus.client.Gauge; +import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Collections; @@ -129,6 +130,7 @@ import org.apache.pulsar.common.api.proto.ProducerAccessMode; import org.apache.pulsar.common.api.proto.ProtocolVersion; import org.apache.pulsar.common.api.proto.Schema; import org.apache.pulsar.common.api.proto.ServerError; +import org.apache.pulsar.common.api.proto.SingleMessageMetadata; import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.intercept.InterceptException; import org.apache.pulsar.common.naming.Metadata; @@ -1832,9 +1834,17 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { if (entry != null) { // in this case, all the data has been compacted, so return the last position // in the compacted ledger to the client - MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); - int bs = metadata.getNumMessagesInBatch(); - int largestBatchIndex = bs > 0 ? bs - 1 : -1; + ByteBuf payload = entry.getDataBuffer(); + MessageMetadata metadata = Commands.parseMessageMetadata(payload); + int largestBatchIndex; + try { + largestBatchIndex = calculateTheLastBatchIndexInBatch(metadata, payload); + } catch (IOException ioEx){ + ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, + "Failed to deserialize batched message from the last entry of the compacted Ledger: " + + ioEx.getMessage())); + return; + } ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, entry.getLedgerId(), entry.getEntryId(), partitionIndex, largestBatchIndex, markDeletePosition != null ? markDeletePosition.getLedgerId() : -1, @@ -1857,6 +1867,25 @@ public class ServerCnx extends PulsarHandler implements TransportCnx { }); } + private int calculateTheLastBatchIndexInBatch(MessageMetadata metadata, ByteBuf payload) throws IOException { + int batchSize = metadata.getNumMessagesInBatch(); + if (batchSize <= 1){ + return -1; + } + SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata(); + int lastBatchIndexInBatch = -1; + for (int i = 0; i < batchSize; i++){ + ByteBuf singleMessagePayload = + Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, i, batchSize); + singleMessagePayload.release(); + if (singleMessageMetadata.isCompactedOut()){ + continue; + } + lastBatchIndexInBatch = i; + } + return lastBatchIndexInBatch; + } + private CompletableFuture<Boolean> isNamespaceOperationAllowed(NamespaceName namespaceName, NamespaceOperation operation) { if (!service.isAuthorizationEnabled()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java new file mode 100644 index 00000000000..0be9fa40754 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.compaction; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.ReaderImpl; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Test(groups = "broker-impl") +public class GetLastMessageIdCompactedTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + // Disable the scheduled task: compaction. + conf.setBrokerServiceCompactionMonitorIntervalInSeconds(Integer.MAX_VALUE); + // Disable the scheduled task: retention. + conf.setRetentionCheckIntervalInSeconds(Integer.MAX_VALUE); + } + + private MessageIdImpl getLastMessageIdByTopic(String topicName) throws Exception{ + return (MessageIdImpl) pulsar.getBrokerService().getTopic(topicName, false) + .get().get().getLastMessageId().get(); + } + + private void triggerCompactionAndWait(String topicName) throws Exception { + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get(); + persistentTopic.triggerCompaction(); + Awaitility.await().untilAsserted(() -> { + PositionImpl lastConfirmPos = (PositionImpl) persistentTopic.getManagedLedger().getLastConfirmedEntry(); + PositionImpl markDeletePos = (PositionImpl) persistentTopic + .getSubscription(Compactor.COMPACTION_SUBSCRIPTION).getCursor().getMarkDeletedPosition(); + assertEquals(markDeletePos.getLedgerId(), lastConfirmPos.getLedgerId()); + assertEquals(markDeletePos.getEntryId(), lastConfirmPos.getEntryId()); + }); + } + + private void triggerLedgerSwitch(String topicName) throws Exception{ + admin.topics().unload(topicName); + Awaitility.await().until(() -> { + CompletableFuture<Optional<Topic>> topicFuture = + pulsar.getBrokerService().getTopic(topicName, false); + if (!topicFuture.isDone() || topicFuture.isCompletedExceptionally()){ + return false; + } + Optional<Topic> topicOptional = topicFuture.join(); + if (!topicOptional.isPresent()){ + return false; + } + PersistentTopic persistentTopic = (PersistentTopic) topicOptional.get(); + ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + return managedLedger.getState() == ManagedLedgerImpl.State.LedgerOpened; + }); + } + + private void clearAllTheLedgersOutdated(String topicName) throws Exception { + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).get().get(); + ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> { + CompletableFuture<Void> future = new CompletableFuture(); + managedLedger.trimConsumedLedgersInBackground(future); + future.join(); + return managedLedger.getLedgersInfo().size() == 1; + }); + } + + @Test + public void testGetLastMessageIdWhenLedgerEmpty() throws Exception { + String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp"); + String subName = "sub"; + Consumer<String> consumer = createConsumer(topicName, subName); + MessageIdImpl messageId = (MessageIdImpl) consumer.getLastMessageId(); + assertEquals(messageId.getLedgerId(), -1); + assertEquals(messageId.getEntryId(), -1); + + // cleanup. + consumer.close(); + admin.topics().delete(topicName, false); + } + + private Producer<String> createProducer(boolean enabledBatch, String topicName) throws Exception { + ProducerBuilder<String> producerBuilder = pulsarClient.newProducer(Schema.STRING) + .topic(topicName) + .enableBatching(enabledBatch); + if (enabledBatch){ + producerBuilder.batchingMaxBytes(Integer.MAX_VALUE) + .batchingMaxPublishDelay(3, TimeUnit.HOURS) + .batchingMaxBytes(Integer.MAX_VALUE); + } + return producerBuilder.create(); + } + + private Consumer<String> createConsumer(String topicName, String subName) throws Exception { + return pulsarClient.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionName(subName) + .receiverQueueSize(1) + .readCompacted(true) + .subscribe(); + } + + @Test + public void testGetLastMessageIdWhenNoNonEmptyLedgerExists() throws Exception { + String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp"); + String subName = "sub"; + ReaderImpl<String> reader = (ReaderImpl<String>) pulsarClient.newReader(Schema.STRING) + .topic(topicName) + .subscriptionName(subName) + .receiverQueueSize(1) + .startMessageId(MessageId.earliest) + .readCompacted(false) + .create(); + + Producer<String> producer = createProducer(false, topicName); + + producer.newMessage().key("k0").value("v0").sendAsync().get(); + reader.readNext(); + triggerLedgerSwitch(topicName); + clearAllTheLedgersOutdated(topicName); + + MessageIdImpl messageId = (MessageIdImpl) reader.getConsumer().getLastMessageId(); + assertEquals(messageId.getLedgerId(), -1); + assertEquals(messageId.getEntryId(), -1); + + // cleanup. + reader.close(); + producer.close(); + admin.topics().delete(topicName, false); + } + + @DataProvider(name = "enabledBatch") + public Object[][] enabledBatch(){ + return new Object[][]{ + {true}, + {false} + }; + } + + @Test(dataProvider = "enabledBatch") + public void testGetLastMessageIdBeforeCompaction(boolean enabledBatch) throws Exception { + String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp"); + String subName = "sub"; + Consumer<String> consumer = createConsumer(topicName, subName); + Producer<String> producer = createProducer(enabledBatch, topicName); + + List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>(); + sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync()); + sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync()); + sendFutures.add(producer.newMessage().key("k0").value("v2").sendAsync()); + producer.flush(); + sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync()); + sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync()); + sendFutures.add(producer.newMessage().key("k1").value("v2").sendAsync()); + producer.flush(); + FutureUtil.waitForAll(sendFutures).join(); + + MessageIdImpl lastMessageIdExpected = getLastMessageIdByTopic(topicName); + MessageIdImpl lastMessageId = (MessageIdImpl) consumer.getLastMessageId(); + assertEquals(lastMessageId.getLedgerId(), lastMessageIdExpected.getLedgerId()); + assertEquals(lastMessageId.getEntryId(), lastMessageIdExpected.getEntryId()); + if (enabledBatch){ + BatchMessageIdImpl lastBatchMessageIdByTopic = (BatchMessageIdImpl) lastMessageIdExpected; + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) lastMessageId; + assertEquals(batchMessageId.getBatchSize(), lastBatchMessageIdByTopic.getBatchSize()); + assertEquals(batchMessageId.getBatchIndex(), lastBatchMessageIdByTopic.getBatchIndex()); + } + + // cleanup. + consumer.close(); + producer.close(); + admin.topics().delete(topicName, false); + } + + @Test(dataProvider = "enabledBatch") + public void testGetLastMessageIdAfterCompaction(boolean enabledBatch) throws Exception { + String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp"); + String subName = "sub"; + Consumer<String> consumer = createConsumer(topicName, subName); + Producer<String> producer = createProducer(enabledBatch, topicName); + + List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>(); + sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync()); + sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync()); + sendFutures.add(producer.newMessage().key("k0").value("v2").sendAsync()); + producer.flush(); + sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync()); + sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync()); + sendFutures.add(producer.newMessage().key("k1").value("v2").sendAsync()); + producer.flush(); + FutureUtil.waitForAll(sendFutures).join(); + + triggerCompactionAndWait(topicName); + + MessageIdImpl lastMessageIdByTopic = getLastMessageIdByTopic(topicName); + MessageIdImpl messageId = (MessageIdImpl) consumer.getLastMessageId(); + assertEquals(messageId.getLedgerId(), lastMessageIdByTopic.getLedgerId()); + assertEquals(messageId.getEntryId(), lastMessageIdByTopic.getEntryId()); + if (enabledBatch){ + BatchMessageIdImpl lastBatchMessageIdByTopic = (BatchMessageIdImpl) lastMessageIdByTopic; + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId; + assertEquals(batchMessageId.getBatchSize(), lastBatchMessageIdByTopic.getBatchSize()); + assertEquals(batchMessageId.getBatchIndex(), lastBatchMessageIdByTopic.getBatchIndex()); + } + + // cleanup. + consumer.close(); + producer.close(); + admin.topics().delete(topicName, false); + } + + @Test(dataProvider = "enabledBatch") + public void testGetLastMessageIdAfterCompactionEndWithNullMsg(boolean enabledBatch) throws Exception { + String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp"); + String subName = "sub"; + Consumer<String> consumer = createConsumer(topicName, subName); + Producer<String> producer = createProducer(enabledBatch, topicName); + + List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>(); + sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync()); + sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync()); + sendFutures.add(producer.newMessage().key("k0").value("v2").sendAsync()); + producer.flush(); + sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync()); + sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync()); + sendFutures.add(producer.newMessage().key("k1").value(null).sendAsync()); + sendFutures.add(producer.newMessage().key("k2").value("v0").sendAsync()); + sendFutures.add(producer.newMessage().key("k2").value("v1").sendAsync()); + sendFutures.add(producer.newMessage().key("k2").value(null).sendAsync()); + producer.flush(); + FutureUtil.waitForAll(sendFutures).join(); + + triggerCompactionAndWait(topicName); + + MessageIdImpl lastMessageIdExpected = (MessageIdImpl) sendFutures.get(2).get(); + MessageIdImpl lastMessageId = (MessageIdImpl) consumer.getLastMessageId(); + assertEquals(lastMessageId.getLedgerId(), lastMessageIdExpected.getLedgerId()); + assertEquals(lastMessageId.getEntryId(), lastMessageIdExpected.getEntryId()); + if (enabledBatch){ + BatchMessageIdImpl lastBatchMessageIdExpected = (BatchMessageIdImpl) lastMessageIdExpected; + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) lastMessageId; + assertEquals(batchMessageId.getBatchSize(), lastBatchMessageIdExpected.getBatchSize()); + assertEquals(batchMessageId.getBatchIndex(), lastBatchMessageIdExpected.getBatchIndex()); + } + + // cleanup. + consumer.close(); + producer.close(); + admin.topics().delete(topicName, false); + } + + @Test(dataProvider = "enabledBatch") + public void testGetLastMessageIdAfterCompactionEndWithNullMsg2(boolean enabledBatch) throws Exception { + String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp"); + String subName = "sub"; + Consumer<String> consumer = createConsumer(topicName, subName); + Producer<String> producer = createProducer(enabledBatch, topicName); + + List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>(); + sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync()); + sendFutures.add(producer.newMessage().key("k0").value("v1").sendAsync()); + producer.flush(); + sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync()); + sendFutures.add(producer.newMessage().key("k1").value("v1").sendAsync()); + sendFutures.add(producer.newMessage().key("k1").value("v2").sendAsync()); + sendFutures.add(producer.newMessage().key("k2").value("v0").sendAsync()); + sendFutures.add(producer.newMessage().key("k2").value("v1").sendAsync()); + sendFutures.add(producer.newMessage().key("k2").value(null).sendAsync()); + producer.flush(); + FutureUtil.waitForAll(sendFutures).join(); + + triggerCompactionAndWait(topicName); + + MessageIdImpl lastMessageIdExpected = (MessageIdImpl) sendFutures.get(4).get(); + MessageIdImpl lastMessageId = (MessageIdImpl) consumer.getLastMessageId(); + assertEquals(lastMessageId.getLedgerId(), lastMessageIdExpected.getLedgerId()); + assertEquals(lastMessageId.getEntryId(), lastMessageIdExpected.getEntryId()); + if (enabledBatch){ + BatchMessageIdImpl lastBatchMessageIdExpected = (BatchMessageIdImpl) lastMessageIdExpected; + BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) lastMessageId; + assertEquals(batchMessageId.getBatchSize(), lastBatchMessageIdExpected.getBatchSize()); + assertEquals(batchMessageId.getBatchIndex(), lastBatchMessageIdExpected.getBatchIndex()); + } + + // cleanup. + consumer.close(); + producer.close(); + admin.topics().delete(topicName, false); + } + + @Test(dataProvider = "enabledBatch") + public void testGetLastMessageIdAfterCompactionAllNullMsg(boolean enabledBatch) throws Exception { + String topicName = "persistent://public/default/" + BrokerTestUtil.newUniqueName("tp"); + String subName = "sub"; + Consumer<String> consumer = createConsumer(topicName, subName); + Producer<String> producer = createProducer(enabledBatch, topicName); + + List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>(); + sendFutures.add(producer.newMessage().key("k0").value("v0").sendAsync()); + sendFutures.add(producer.newMessage().key("k0").value(null).sendAsync()); + producer.flush(); + sendFutures.add(producer.newMessage().key("k1").value("v0").sendAsync()); + sendFutures.add(producer.newMessage().key("k1").value(null).sendAsync()); + sendFutures.add(producer.newMessage().key("k2").value("v0").sendAsync()); + sendFutures.add(producer.newMessage().key("k2").value(null).sendAsync()); + producer.flush(); + FutureUtil.waitForAll(sendFutures).join(); + + triggerCompactionAndWait(topicName); + + MessageIdImpl lastMessageId = (MessageIdImpl) consumer.getLastMessageId(); + assertFalse(lastMessageId instanceof BatchMessageIdImpl); + assertEquals(lastMessageId.getLedgerId(), -1); + assertEquals(lastMessageId.getEntryId(), -1); + + // cleanup. + consumer.close(); + producer.close(); + admin.topics().delete(topicName, false); + } +}
