This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d2b2942d8e01f78e2f991ea3997ee3613ad9d1c1 Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Fri Aug 8 12:46:17 2025 +0800 [improve][broker]Remove block calling that named cursor.asyncGetNth when expiring messages (#24606) (cherry picked from commit d275bd4004ab4d11e9527e10c3069126689ca10a) --- .../pulsar/broker/service/MessageExpirer.java | 3 + .../nonpersistent/NonPersistentSubscription.java | 6 ++ .../persistent/PersistentMessageExpiryMonitor.java | 6 ++ .../service/persistent/PersistentReplicator.java | 17 ++++ .../service/persistent/PersistentSubscription.java | 20 ++++ .../broker/service/persistent/PersistentTopic.java | 41 +++++++- .../service/PersistentMessageFinderTest.java | 110 +++++++++++++++++++++ .../pulsar/broker/service/ReplicatorTest.java | 30 ++++++ 8 files changed, 230 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java index 7cb1d2a904a..3008717a3df 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.common.classification.InterfaceStability; @@ -27,4 +28,6 @@ public interface MessageExpirer { boolean expireMessages(Position position); boolean expireMessages(int messageTTLInSeconds); + + CompletableFuture<Boolean> expireMessagesAsync(int messageTTLInSeconds); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java index cfe05cc32b7..b9083d68046 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java @@ -472,6 +472,12 @@ public class NonPersistentSubscription extends AbstractSubscription { + " non-persistent topic."); } + @Override + public CompletableFuture<Boolean> expireMessagesAsync(int messageTTLInSeconds) { + return CompletableFuture.failedFuture(new UnsupportedOperationException("Expire message by timestamp is not" + + " supported for non-persistent topic.")); + } + @Override public boolean expireMessages(Position position) { throw new UnsupportedOperationException("Expire message by position is not supported for" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 7a18f4abe47..93b03145cad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.Objects; import java.util.Optional; import java.util.SortedMap; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback; @@ -80,6 +81,11 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag && this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData(); } + @Override + public CompletableFuture<Boolean> expireMessagesAsync(int messageTTLInSeconds) { + return CompletableFuture.supplyAsync(() -> expireMessages(messageTTLInSeconds), topic.getOrderedExecutor()); + } + @Override public boolean expireMessages(int messageTTLInSeconds) { if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index cf3a71f7d73..fffbb1b6d3d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -685,6 +685,23 @@ public abstract class PersistentReplicator extends AbstractReplicator return expiryMonitor.expireMessages(messageTTLInSeconds); } + @Override + public CompletableFuture<Boolean> expireMessagesAsync(int messageTTLInSeconds) { + long backlog = cursor.getNumberOfEntriesInBacklog(false); + if (backlog == 0) { + return CompletableFuture.completedFuture(false); + } else if (backlog < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK) { + return topic.isOldestMessageExpiredAsync(cursor, messageTTLInSeconds).thenCompose(oldestMsgExpired -> { + if (oldestMsgExpired) { + return expiryMonitor.expireMessagesAsync(messageTTLInSeconds); + } else { + return CompletableFuture.completedFuture(false); + } + }); + } + return expiryMonitor.expireMessagesAsync(messageTTLInSeconds); + } + @Override public boolean expireMessages(Position position) { return expiryMonitor.expireMessages(position); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 86c4251f371..b3834017395 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1208,6 +1208,26 @@ public class PersistentSubscription extends AbstractSubscription { return expiryMonitor.expireMessages(messageTTLInSeconds); } + @Override + public CompletableFuture<Boolean> expireMessagesAsync(int messageTTLInSeconds) { + long backlog = getNumberOfEntriesInBacklog(false); + if (backlog == 0) { + return CompletableFuture.completedFuture(false); + } + if (dispatcher != null && dispatcher.isConsumerConnected() && backlog < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK) { + return topic.isOldestMessageExpiredAsync(cursor, messageTTLInSeconds) + .thenCompose(oldestMsgExpired -> { + if (oldestMsgExpired) { + this.lastExpireTimestamp = System.currentTimeMillis(); + return expiryMonitor.expireMessagesAsync(messageTTLInSeconds); + } else { + return CompletableFuture.completedFuture(false); + } + }); + } + return expiryMonitor.expireMessagesAsync(messageTTLInSeconds); + } + @Override public boolean expireMessages(Position position) { this.lastExpireTimestamp = System.currentTimeMillis(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2b7221600e9..d513c62ecb5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -30,6 +30,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; import io.netty.buffer.ByteBuf; import io.netty.util.concurrent.FastThreadLocal; +import java.io.IOException; import java.time.Clock; import java.util.ArrayList; import java.util.Collections; @@ -2122,13 +2123,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal if (!isCompactionSubscription(sub.getName()) && (additionalSystemCursorNames.isEmpty() || !additionalSystemCursorNames.contains(sub.getName()))) { - sub.expireMessages(messageTtlInSeconds); + sub.expireMessagesAsync(messageTtlInSeconds); } }); replicators.forEach((__, replicator) - -> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds)); + -> ((PersistentReplicator) replicator).expireMessagesAsync(messageTtlInSeconds)); shadowReplicators.forEach((__, replicator) - -> ((PersistentReplicator) replicator).expireMessages(messageTtlInSeconds)); + -> ((PersistentReplicator) replicator).expireMessagesAsync(messageTtlInSeconds)); } } @@ -3931,6 +3932,40 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal return isOldestMessageExpired; } + public CompletableFuture<Boolean> isOldestMessageExpiredAsync(ManagedCursor cursor, int messageTTLInSeconds) { + CompletableFuture<Boolean> res = new CompletableFuture<>(); + cursor.asyncGetNthEntry(1, IndividualDeletedEntries.Include, new AsyncCallbacks.ReadEntryCallback() { + + @Override + public void readEntryComplete(Entry entry, Object ctx) { + long entryTimestamp = 0; + try { + entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); + res.complete(MessageImpl.isEntryExpired( + (int) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD), entryTimestamp)); + } catch (IOException e) { + log.warn("[{}] [{}] Error while getting the oldest message", topic, cursor.toString(), e); + res.complete(false); + } + + } + + @Override + public void readEntryFailed(ManagedLedgerException e, Object ctx) { + if (brokerService.pulsar().getConfiguration().isAutoSkipNonRecoverableData() + && e instanceof NonRecoverableLedgerException) { + // NonRecoverableLedgerException means the ledger or entry can't be read anymore. + // if AutoSkipNonRecoverableData is set to true, just return true here. + res.complete(true); + } else { + log.warn("[{}] [{}] Error while getting the oldest message", topic, cursor.toString(), e); + res.complete(false); + } + } + }, null); + return res; + } + /** * Clears backlog for all cursors in the topic. * diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 93b061f8420..135829160de 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -61,6 +62,8 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.Ledge import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -450,6 +453,62 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { } + /** + * It tests that message expiry doesn't get stuck if it can't read deleted ledger's entry. + */ + @Test + void testMessageExpiryAsyncWithTimestampNonRecoverableException() throws Exception { + + final String ledgerAndCursorName = "testPersistentMessageExpiryWithNonRecoverableLedgers"; + final int entriesPerLedger = 2; + final int totalEntries = 10; + final int ttlSeconds = 1; + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setRetentionSizeInMB(10); + config.setMaxEntriesPerLedger(entriesPerLedger); + config.setRetentionTime(1, TimeUnit.HOURS); + config.setAutoSkipNonRecoverableData(true); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config); + ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); + + for (int i = 0; i < totalEntries; i++) { + ledger.addEntry(createMessageWrittenToLedger("msg" + i)); + } + Awaitility.await().untilAsserted(() -> + assertEquals(ledger.getState(), ManagedLedgerImpl.State.LedgerOpened)); + + List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList(); + LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1); + // The `lastLedgerInfo` should be newly opened, and it does not contain any entries. + // Please refer to: https://github.com/apache/pulsar/pull/22034 + assertEquals(lastLedgerInfo.getEntries(), 0); + assertEquals(ledgers.size(), totalEntries / entriesPerLedger + 1); + + // this will make sure that all entries should be deleted + Thread.sleep(TimeUnit.SECONDS.toMillis(ttlSeconds)); + + bkc.deleteLedger(ledgers.get(0).getLedgerId()); + bkc.deleteLedger(ledgers.get(1).getLedgerId()); + bkc.deleteLedger(ledgers.get(2).getLedgerId()); + + PersistentTopic mock = mockPersistentTopic("topicname"); + + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); + assertTrue(monitor.expireMessagesAsync(ttlSeconds).get()); + Awaitility.await().untilAsserted(() -> { + Position markDeletePosition = c1.getMarkDeletedPosition(); + // The markDeletePosition points to the last entry of the previous ledger in lastLedgerInfo. + assertEquals(markDeletePosition.getLedgerId(), lastLedgerInfo.getLedgerId() - 1); + assertEquals(markDeletePosition.getEntryId(), entriesPerLedger - 1); + }); + + c1.close(); + ledger.close(); + factory.shutdown(); + + } + public void testFindMessageWithTimestampAutoSkipNonRecoverable() throws Exception { final String ledgerAndCursorName = "testFindMessageWithTimestampAutoSkipNonRecoverable"; @@ -625,6 +684,20 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { assertEquals(c1.getNumberOfEntriesInBacklog(true), 0); } + private PersistentTopic mockPersistentTopic(String topicName) throws Exception { + PersistentTopic mock = mock(PersistentTopic.class); + when(mock.getName()).thenReturn("topicname"); + when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST); + BrokerService brokerService = mock(BrokerService.class); + doReturn(brokerService).when(mock).getBrokerService(); + doReturn(executor).when(mock).getOrderedExecutor(); + PulsarService pulsarService = mock(PulsarService.class); + doReturn(pulsarService).when(brokerService).pulsar(); + ServiceConfiguration serviceConfiguration = new ServiceConfiguration(); + doReturn(serviceConfiguration).when(pulsarService).getConfig(); + return mock; + } + @Test public void testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger() throws Throwable { final String ledgerAndCursorName = "testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger"; @@ -664,6 +737,43 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { Assert.assertNull(throwableAtomicReference.get()); } + @Test + public void testCheckExpiryAsyncByLedgerClosureTimeWithAckUnclosedLedger() throws Throwable { + final String ledgerAndCursorName = "testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger"; + int maxTTLSeconds = 1; + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(5); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config); + ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); + // set client clock to 10 days later + long incorrectPublishTimestamp = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(10); + for (int i = 0; i < 7; i++) { + ledger.addEntry(createMessageWrittenToLedger("msg" + i, incorrectPublishTimestamp)); + } + assertEquals(ledger.getLedgersInfoAsList().size(), 2); + PersistentTopic mock = mockPersistentTopic("topicname"); + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); + AsyncCallbacks.MarkDeleteCallback markDeleteCallback = + (AsyncCallbacks.MarkDeleteCallback) spy( + FieldUtils.readDeclaredField(monitor, "markDeleteCallback", true)); + FieldUtils.writeField(monitor, "markDeleteCallback", markDeleteCallback, true); + + AtomicReference<Throwable> throwableAtomicReference = new AtomicReference<>(); + Mockito.doAnswer(invocation -> { + ManagedLedgerException argument = invocation.getArgument(0, ManagedLedgerException.class); + throwableAtomicReference.set(argument); + return invocation.callRealMethod(); + }).when(markDeleteCallback).markDeleteFailed(any(), any()); + + Position position = ledger.getLastConfirmedEntry(); + c1.markDelete(position); + Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds)); + monitor.expireMessagesAsync(maxTTLSeconds).get(); + assertEquals(c1.getNumberOfEntriesInBacklog(true), 0); + + Assert.assertNull(throwableAtomicReference.get()); + } + @Test void testMessageExpiryWithPosition() throws Exception { final String ledgerAndCursorName = "testPersistentMessageExpiryWithPositionNonRecoverableLedgers"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index eefcc0a9f04..cd078fd89f9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -668,6 +668,36 @@ public class ReplicatorTest extends ReplicatorTestBase { assertEquals(status.getReplicationBacklog(), 0); } + @Test(timeOut = 30000) + public void testReplicatorExpireMsgAsync() throws Exception { + + // This test is to verify that reset cursor fails on global topic + SortedSet<String> testDests = new TreeSet<>(); + + final TopicName dest = TopicName + .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/clearBacklogTopic")); + testDests.add(dest.toString()); + + @Cleanup + MessageProducer producer1 = new MessageProducer(url1, dest); + + @Cleanup + MessageConsumer consumer1 = new MessageConsumer(url3, dest); + + // Produce from cluster1 and consume from the rest + producer1.produce(2); + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); + PersistentReplicator replicator = (PersistentReplicator) spy( + topic.getReplicators().get(topic.getReplicators().keys().stream().toList().get(0))); + replicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), null); + replicator.clearBacklog().get(); + Thread.sleep(100); + replicator.updateRates(); // for code-coverage + replicator.expireMessagesAsync(1).get(); // for code-coverage + ReplicatorStats status = replicator.getStats(); + assertEquals(status.getReplicationBacklog(), 0); + } + @Test(timeOut = 30000) public void testResetReplicatorSubscriptionPosition() throws Exception {