This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f56c383a5312ef9186880662860b8d2f38b3c0c8 Author: Cong Zhao <[email protected]> AuthorDate: Thu Mar 28 03:42:15 2024 +0800 [fix][broker] Avoid expired unclosed ledgers when checking expired messages by ledger closure time (#22335) (cherry picked from commit f77fe5f099f7ecc334509db07bba477c4226cf19) --- .../persistent/PersistentMessageExpiryMonitor.java | 4 +- .../service/PersistentMessageFinderTest.java | 51 +++++++++++++++++++--- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index ac391c10503..2478a7a2538 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -121,8 +121,8 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag managedLedger.getLedgersInfo().lastKey(), true); MLDataFormats.ManagedLedgerInfo.LedgerInfo info = null; for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : ledgerInfoSortedMap.values()) { - if (!ledgerInfo.hasTimestamp() || !MessageImpl.isEntryExpired(messageTTLInSeconds, - ledgerInfo.getTimestamp())) { + if (!ledgerInfo.hasTimestamp() || ledgerInfo.getTimestamp() == 0L + || !MessageImpl.isEntryExpired(messageTTLInSeconds, ledgerInfo.getTimestamp())) { break; } info = ledgerInfo; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index ace552a55a7..6883c0467e4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -33,10 +33,8 @@ import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; - import io.netty.buffer.ByteBuf; import io.netty.buffer.UnpooledByteBufAllocator; - import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashSet; @@ -46,7 +44,9 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; - +import java.util.concurrent.atomic.AtomicReference; +import javax.ws.rs.client.Entity; +import javax.ws.rs.core.MediaType; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -59,6 +59,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; @@ -72,11 +73,10 @@ import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.Commands; import org.awaitility.Awaitility; +import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.Test; -import javax.ws.rs.client.Entity; -import javax.ws.rs.core.MediaType; - @Test(groups = "broker") public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { @@ -463,6 +463,45 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { assertEquals(c1.getNumberOfEntriesInBacklog(true), 0); } + @Test + public void testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger() throws Throwable { + final String ledgerAndCursorName = "testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger"; + int maxTTLSeconds = 1; + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(5); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config); + ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); + // set client clock to 10 days later + long incorrectPublishTimestamp = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(10); + for (int i = 0; i < 7; i++) { + ledger.addEntry(createMessageWrittenToLedger("msg" + i, incorrectPublishTimestamp)); + } + assertEquals(ledger.getLedgersInfoAsList().size(), 2); + PersistentTopic mock = mock(PersistentTopic.class); + when(mock.getName()).thenReturn("topicname"); + when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST); + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); + AsyncCallbacks.MarkDeleteCallback markDeleteCallback = + (AsyncCallbacks.MarkDeleteCallback) spy( + FieldUtils.readDeclaredField(monitor, "markDeleteCallback", true)); + FieldUtils.writeField(monitor, "markDeleteCallback", markDeleteCallback, true); + + AtomicReference<Throwable> throwableAtomicReference = new AtomicReference<>(); + Mockito.doAnswer(invocation -> { + ManagedLedgerException argument = invocation.getArgument(0, ManagedLedgerException.class); + throwableAtomicReference.set(argument); + return invocation.callRealMethod(); + }).when(markDeleteCallback).markDeleteFailed(any(), any()); + + PositionImpl position = (PositionImpl) ledger.getLastConfirmedEntry(); + c1.markDelete(position); + Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds)); + monitor.expireMessages(maxTTLSeconds); + assertEquals(c1.getNumberOfEntriesInBacklog(true), 0); + + Assert.assertNull(throwableAtomicReference.get()); + } + @Test void testMessageExpiryWithPosition() throws Exception { final String ledgerAndCursorName = "testPersistentMessageExpiryWithPositionNonRecoverableLedgers";
