This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit aa74f0c63e03dfc842b22884762834459562d560 Author: Enrico Olivelli <[email protected]> AuthorDate: Mon Dec 7 14:16:43 2020 +0100 Issue 8677: Cannot get lastMessageId for an empty topic due to message retention (#8725) When we are trimming the ledgers we are saving the `currentLedger` but as soon as your restart the broker the currentLedger is not containing the lastMessageId (because it is a fresh new ledger). Changes: - add test case on pulsar-broker that reproduces the issue reported but the user - log a message when we are trimming the ledger at lastAddConfirmedEntry - add test case that prevent changes in the future on ManagedLedgerImpl - fix a minor issue in PersistentTopic#getLastMessageId, a return keyword was missing and we continued with a call ti ManagedLedger, the CompletableFuture was already 'completed' so the final result is not changed (but we are saving resources) Fixes #8677 (cherry picked from commit 5054642966adc97808ca8e97a4e22170e5964b0b) --- .gitignore | 3 + .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 7 +- .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 27 ++++++++ .../broker/service/persistent/PersistentTopic.java | 15 +++- .../broker/service/ConsumedLedgersTrimTest.java | 80 ++++++++++++++++++++++ 5 files changed, 129 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 297f31d..176d1d2 100644 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,9 @@ pulsar-functions/worker/src/test/resources/ *.iml *.iws +# NetBeans +nb-configuration.xml + # Mac **/.DS_Store diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index c58a1a5..0c69147 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2072,7 +2072,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (log.isDebugEnabled()) { log.debug("[{}] Slowest consumer ledger id: {}", name, slowestReaderLedgerId); } - // skip ledger if retention constraint met for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) { boolean expired = hasLedgerRetentionExpired(ls.getTimestamp()); @@ -2123,8 +2122,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { advanceNonDurableCursors(ledgersToDelete); + PositionImpl currentLastConfirmedEntry = lastConfirmedEntry; // Update metadata for (LedgerInfo ls : ledgersToDelete) { + if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) { + // this info is relevant because the lastMessageId won't be available anymore + log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be deleted", name, + ls.getLedgerId(), currentLastConfirmedEntry); + } ledgerCache.remove(ls.getLedgerId()); ledgers.remove(ls.getLedgerId()); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 56f215c..e6388c4 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -1854,6 +1854,33 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { ml.close(); } + /** + * Set retention time = 0 and create a empty ledger, + * first position can't higher than last after trim ledgers. + * Even if we do not have subscriptions the ledger + * that contains the lastConfirmedEntry will be deleted anyway. + */ + @Test + public void testRetention0WithEmptyLedgerWithoutCursors() throws Exception { + ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setRetentionTime(0, TimeUnit.MINUTES); + config.setMaxEntriesPerLedger(1); + + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("deletion_after_retention_test_ledger", config); + ml.addEntry("message1".getBytes()); + ml.close(); + + // reopen ml + ml = (ManagedLedgerImpl) factory.open("deletion_after_retention_test_ledger", config); + ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null)); + + assertTrue(ml.getFirstPosition().ledgerId <= ml.lastConfirmedEntry.ledgerId); + assertFalse(ml.getLedgersInfo().containsKey(ml.lastConfirmedEntry.ledgerId), + "the ledger at lastConfirmedEntry has not been trimmed!"); + ml.close(); + } + @Test public void testInfiniteRetention() throws Exception { ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 33b49ac..02fdf74 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2206,12 +2206,23 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal public CompletableFuture<MessageId> getLastMessageId() { CompletableFuture<MessageId> completableFuture = new CompletableFuture<>(); PositionImpl position = (PositionImpl) ledger.getLastConfirmedEntry(); - int partitionIndex = TopicName.getPartitionIndex(getName()); + String name = getName(); + int partitionIndex = TopicName.getPartitionIndex(name); + if (log.isDebugEnabled()) { + log.debug("getLastMessageId {}, partitionIndex{}, position {}", name, partitionIndex, position); + } if (position.getEntryId() == -1) { completableFuture .complete(new MessageIdImpl(position.getLedgerId(), position.getEntryId(), partitionIndex)); + return completableFuture; + } + ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger; + if (!ledgerImpl.ledgerExists(position.getLedgerId())) { + completableFuture + .complete(MessageId.earliest); + return completableFuture; } - ((ManagedLedgerImpl) ledger).asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { + ledgerImpl.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() { @Override public void readEntryComplete(Entry entry, Object ctx) { PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java index 72c2eff..cc84de2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumedLedgersTrimTest.java @@ -19,6 +19,9 @@ package org.apache.pulsar.broker.service; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import java.util.concurrent.CompletableFuture; import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; @@ -30,8 +33,14 @@ import org.junit.Test; import org.testng.Assert; import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.api.MessageId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ConsumedLedgersTrimTest extends BrokerTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(ConsumedLedgersTrimTest.class); + @Override protected void setup() throws Exception { //No-op @@ -90,4 +99,75 @@ public class ConsumedLedgersTrimTest extends BrokerTestBase { Thread.sleep(1500); Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1); } + + + @Test + public void TestConsumedLedgersTrimNoSubscriptions() throws Exception { + conf.setRetentionCheckIntervalInSeconds(1); + conf.setBrokerDeleteInactiveTopicsEnabled(false); + super.baseSetup(); + final String topicName = "persistent://prop/ns-abc/TestConsumedLedgersTrimNoSubscriptions"; + + // write some messages + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(topicName) + .producerName("producer-name") + .create(); + + // set retention parameters, the ledgers are to be deleted as soon as possible + // but the topic is not to be automatically deleted + PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); + ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig(); + managedLedgerConfig.setRetentionSizeInMB(-1); + managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS); + managedLedgerConfig.setMaxEntriesPerLedger(1000); + managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); + MessageId initialMessageId = persistentTopic.getLastMessageId().get(); + LOG.info("lastmessageid " + initialMessageId); + + int msgNum = 7; + for (int i = 0; i < msgNum; i++) { + producer.send(new byte[1024 * 1024]); + } + + ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1); + MessageId messageIdBeforeRestart = pulsar.getAdminClient().topics().getLastMessageId(topicName); + LOG.info("messageIdBeforeRestart " + messageIdBeforeRestart); + assertNotEquals(messageIdBeforeRestart, initialMessageId); + + // restart the broker we have to start a new ledger + // the lastMessageId is still on the previous ledger + restartBroker(); + // force load topic + pulsar.getAdminClient().topics().getStats(topicName); + MessageId messageIdAfterRestart = pulsar.getAdminClient().topics().getLastMessageId(topicName); + LOG.info("lastmessageid " + messageIdAfterRestart); + assertEquals(messageIdAfterRestart, messageIdBeforeRestart); + + persistentTopic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); + managedLedgerConfig = persistentTopic.getManagedLedger().getConfig(); + managedLedgerConfig.setRetentionSizeInMB(-1); + managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS); + managedLedgerConfig.setMaxEntriesPerLedger(1); + managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); + managedLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + // now we have two ledgers, the first is expired but is contains the lastMessageId + // the second is empty and should be kept as it is the current tail + Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 2); + + // force trimConsumedLedgers + Thread.sleep(3000); + CompletableFuture f = new CompletableFuture(); + managedLedger.trimConsumedLedgersInBackground(f); + f.join(); + + // lastMessageId should be available even in this case, but is must + // refer to -1 + MessageId messageIdAfterTrim = pulsar.getAdminClient().topics().getLastMessageId(topicName); + LOG.info("lastmessageid " + messageIdAfterTrim); + assertEquals(messageIdAfterTrim, MessageId.earliest); + + } }
