This is an automated email from the ASF dual-hosted git repository.
heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 16ea0d88ee5 [fix] [broker][branch-3.0] Expire messages according to
ledger close time to avoid client clock skew (#21940) (#22211)
16ea0d88ee5 is described below
commit 16ea0d88ee50dc9267c2da5a31e0e8b4f0d958d0
Author: Heesung Sohn <[email protected]>
AuthorDate: Wed Mar 6 17:53:05 2024 -0800
[fix] [broker][branch-3.0] Expire messages according to ledger close time
to avoid client clock skew (#21940) (#22211)
Co-authored-by: feynmanlin <[email protected]>
---
.../persistent/PersistentMessageExpiryMonitor.java | 35 +++++++++++++++++++++-
.../service/PersistentMessageFinderTest.java | 25 +++++++++++++++-
2 files changed, 58 insertions(+), 2 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 41bc6098e1a..d82f3d93f8f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
import com.google.common.annotations.VisibleForTesting;
import java.util.Objects;
import java.util.Optional;
+import java.util.SortedMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
@@ -30,8 +31,10 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.LedgerNotExistException;
import
org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.protocol.Commands;
@@ -78,7 +81,9 @@ public class PersistentMessageExpiryMonitor implements
FindEntryCallback {
if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE))
{
log.info("[{}][{}] Starting message expiry check, ttl= {}
seconds", topicName, subName,
messageTTLInSeconds);
-
+ // First filter the entire Ledger reached TTL based on the Ledger
closing time to avoid client clock skew
+ checkExpiryByLedgerClosureTime(cursor, messageTTLInSeconds);
+ // Some part of entries in active Ledger may have reached TTL, so
we need to continue searching.
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
entry -> {
try {
long entryTimestamp =
Commands.getEntryTimestamp(entry.getDataBuffer());
@@ -99,6 +104,34 @@ public class PersistentMessageExpiryMonitor implements
FindEntryCallback {
return false;
}
}
+ private void checkExpiryByLedgerClosureTime(ManagedCursor cursor, int
messageTTLInSeconds) {
+ if (messageTTLInSeconds <= 0) {
+ return;
+ }
+ if (cursor instanceof ManagedCursorImpl managedCursor) {
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
managedCursor.getManagedLedger();
+ Position deletedPosition = managedCursor.getMarkDeletedPosition();
+ SortedMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>
ledgerInfoSortedMap =
+
managedLedger.getLedgersInfo().subMap(deletedPosition.getLedgerId(), true,
+ managedLedger.getLedgersInfo().lastKey(), true);
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo info = null;
+ for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo :
ledgerInfoSortedMap.values()) {
+ if (!ledgerInfo.hasTimestamp() ||
!MessageImpl.isEntryExpired(messageTTLInSeconds,
+ ledgerInfo.getTimestamp())) {
+ break;
+ }
+ info = ledgerInfo;
+ }
+ if (info != null && info.getLedgerId() > -1) {
+ PositionImpl position = PositionImpl.get(info.getLedgerId(),
info.getEntries() - 1);
+ if (((PositionImpl)
managedLedger.getLastConfirmedEntry()).compareTo(position) < 0) {
+ findEntryComplete(managedLedger.getLastConfirmedEntry(),
null);
+ } else {
+ findEntryComplete(position, null);
+ }
+ }
+ }
+ }
public boolean expireMessages(Position messagePosition) {
// If it's beyond last position of this topic, do nothing.
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index e77fd07c6ef..d56968c8f8e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -80,8 +80,11 @@ import javax.ws.rs.core.MediaType;
public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {
public static byte[] createMessageWrittenToLedger(String msg) {
+ return createMessageWrittenToLedger(msg, System.currentTimeMillis());
+ }
+ public static byte[] createMessageWrittenToLedger(String msg, long
messageTimestamp) {
MessageMetadata messageMetadata = new MessageMetadata()
- .setPublishTime(System.currentTimeMillis())
+ .setPublishTime(messageTimestamp)
.setProducerName("createMessageWrittenToLedger")
.setSequenceId(1);
ByteBuf data =
UnpooledByteBufAllocator.DEFAULT.heapBuffer().writeBytes(msg.getBytes());
@@ -428,6 +431,26 @@ public class PersistentMessageFinderTest extends
MockedBookKeeperTestCase {
}
+ @Test
+ public void testIncorrectClientClock() throws Exception {
+ final String ledgerAndCursorName = "testIncorrectClientClock";
+ int maxTTLSeconds = 1;
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(1);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open(ledgerAndCursorName, config);
+ ManagedCursorImpl c1 = (ManagedCursorImpl)
ledger.openCursor(ledgerAndCursorName);
+ // set client clock to 10 days later
+ long incorrectPublishTimestamp = System.currentTimeMillis() +
TimeUnit.DAYS.toMillis(10);
+ for (int i = 0; i < 10; i++) {
+ ledger.addEntry(createMessageWrittenToLedger("msg" + i,
incorrectPublishTimestamp));
+ }
+ assertEquals(ledger.getLedgersInfoAsList().size(), 10);
+ PersistentMessageExpiryMonitor monitor = new
PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
+ Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds));
+ monitor.expireMessages(maxTTLSeconds);
+ assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
+ }
+
@Test
void testMessageExpiryWithPosition() throws Exception {
final String ledgerAndCursorName =
"testPersistentMessageExpiryWithPositionNonRecoverableLedgers";