This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f77fe5f099f [fix][broker] Avoid expired unclosed ledgers when checking
expired messages by ledger closure time (#22335)
f77fe5f099f is described below
commit f77fe5f099f7ecc334509db07bba477c4226cf19
Author: Cong Zhao <[email protected]>
AuthorDate: Thu Mar 28 03:42:15 2024 +0800
[fix][broker] Avoid expired unclosed ledgers when checking expired messages
by ledger closure time (#22335)
---
.../persistent/PersistentMessageExpiryMonitor.java | 4 +-
.../service/PersistentMessageFinderTest.java | 51 +++++++++++++++++++---
2 files changed, 47 insertions(+), 8 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index ac391c10503..2478a7a2538 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -121,8 +121,8 @@ public class PersistentMessageExpiryMonitor implements
FindEntryCallback, Messag
managedLedger.getLedgersInfo().lastKey(), true);
MLDataFormats.ManagedLedgerInfo.LedgerInfo info = null;
for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo :
ledgerInfoSortedMap.values()) {
- if (!ledgerInfo.hasTimestamp() ||
!MessageImpl.isEntryExpired(messageTTLInSeconds,
- ledgerInfo.getTimestamp())) {
+ if (!ledgerInfo.hasTimestamp() || ledgerInfo.getTimestamp() ==
0L
+ || !MessageImpl.isEntryExpired(messageTTLInSeconds,
ledgerInfo.getTimestamp())) {
break;
}
info = ledgerInfo;
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index ace552a55a7..6883c0467e4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -33,10 +33,8 @@ import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
-
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashSet;
@@ -46,7 +44,9 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-
+import java.util.concurrent.atomic.AtomicReference;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.MediaType;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
@@ -59,6 +59,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import
org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
+import org.apache.commons.lang3.reflect.FieldUtils;
import
org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
@@ -72,11 +73,10 @@ import
org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.Assert;
import org.testng.annotations.Test;
-import javax.ws.rs.client.Entity;
-import javax.ws.rs.core.MediaType;
-
@Test(groups = "broker")
public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
@@ -463,6 +463,45 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
}
+ @Test
+ public void testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger()
throws Throwable {
+ final String ledgerAndCursorName =
"testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger";
+ int maxTTLSeconds = 1;
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(5);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open(ledgerAndCursorName, config);
+ ManagedCursorImpl c1 = (ManagedCursorImpl)
ledger.openCursor(ledgerAndCursorName);
+ // set client clock to 10 days later
+ long incorrectPublishTimestamp = System.currentTimeMillis() +
TimeUnit.DAYS.toMillis(10);
+ for (int i = 0; i < 7; i++) {
+ ledger.addEntry(createMessageWrittenToLedger("msg" + i,
incorrectPublishTimestamp));
+ }
+ assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+ PersistentTopic mock = mock(PersistentTopic.class);
+ when(mock.getName()).thenReturn("topicname");
+ when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
+ PersistentMessageExpiryMonitor monitor = new
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
+ AsyncCallbacks.MarkDeleteCallback markDeleteCallback =
+ (AsyncCallbacks.MarkDeleteCallback) spy(
+ FieldUtils.readDeclaredField(monitor,
"markDeleteCallback", true));
+ FieldUtils.writeField(monitor, "markDeleteCallback",
markDeleteCallback, true);
+
+ AtomicReference<Throwable> throwableAtomicReference = new
AtomicReference<>();
+ Mockito.doAnswer(invocation -> {
+ ManagedLedgerException argument = invocation.getArgument(0,
ManagedLedgerException.class);
+ throwableAtomicReference.set(argument);
+ return invocation.callRealMethod();
+ }).when(markDeleteCallback).markDeleteFailed(any(), any());
+
+ PositionImpl position = (PositionImpl) ledger.getLastConfirmedEntry();
+ c1.markDelete(position);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds));
+ monitor.expireMessages(maxTTLSeconds);
+ assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
+
+ Assert.assertNull(throwableAtomicReference.get());
+ }
+
@Test
void testMessageExpiryWithPosition() throws Exception {
final String ledgerAndCursorName =
"testPersistentMessageExpiryWithPositionNonRecoverableLedgers";