codelipenghui commented on code in PR #21940:
URL: https://github.com/apache/pulsar/pull/21940#discussion_r1467418275
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -100,6 +105,27 @@ public boolean expireMessages(int messageTTLInSeconds) {
}
}
+ private boolean checkExpiryByLedgerClosureTime(ManagedCursor cursor, int
messageTTLInSeconds) {
+ if (cursor instanceof ManagedCursorImpl managedCursor) {
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
managedCursor.getManagedLedger();
+ Position deletedPosition = managedCursor.getMarkDeletedPosition();
+ SortedMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>
ledgerInfoSortedMap =
+ managedLedger.getLedgersInfo()
+ .subMap(deletedPosition.getLedgerId(),
managedLedger.getLedgersInfo().lastKey());
+ long expiryLedgerId = -1;
+ for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo :
ledgerInfoSortedMap.values()) {
+ if (!MessageImpl.isEntryExpired(messageTTLInSeconds,
ledgerInfo.getTimestamp())) {
+ break;
+ }
+ expiryLedgerId = ledgerInfo.getLedgerId();
Review Comment:
```suggestion
if (!ledgerInfo.hasTimestamp()) {
break;
}
if (!MessageImpl.isEntryExpired(messageTTLInSeconds,
ledgerInfo.getTimestamp())) {
break;
}
expiryLedgerId = ledgerInfo.getLedgerId();
```
Is it better to add a check to make sure the ledger has timestamp to avoid a
potential NPE?
I know it shouldn't happen because the loop will not touch the last ledger.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java:
##########
@@ -437,6 +440,28 @@ void
testMessageExpiryWithTimestampNonRecoverableException() throws Exception {
}
+ @Test
+ public void testIncorrectClientClock() throws Exception {
+ final String ledgerAndCursorName = "testIncorrectClientClock";
+ int maxRollOverTimeMs = 1;
+ int maxTTLSeconds = 1;
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaximumRolloverTime(maxRollOverTimeMs,
TimeUnit.MICROSECONDS);
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl)
factory.open(ledgerAndCursorName, config);
+ ManagedCursorImpl c1 = (ManagedCursorImpl)
ledger.openCursor(ledgerAndCursorName);
+ // set client clock to 10 days later
+ long incorrectPublishTimestamp = System.currentTimeMillis() +
TimeUnit.DAYS.toMillis(10);
+ for (int i = 0; i < 10; i++) {
+ ledger.addEntry(createMessageWrittenToLedger("msg" + i,
incorrectPublishTimestamp));
+ }
+ PersistentTopic mock = mock(PersistentTopic.class);
+ when(mock.getName()).thenReturn("topicname");
+ when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
+ PersistentMessageExpiryMonitor monitor = new
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
+ Thread.sleep(Math.max(config.getMaximumRolloverTimeMs(),
TimeUnit.SECONDS.toMillis(maxTTLSeconds)));
+ assertTrue(monitor.expireMessages(maxTTLSeconds));
Review Comment:
```suggestion
assertTrue(monitor.expireMessages(maxTTLSeconds));
assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java:
##########
@@ -437,6 +440,28 @@ void
testMessageExpiryWithTimestampNonRecoverableException() throws Exception {
}
+ @Test
+ public void testIncorrectClientClock() throws Exception {
Review Comment:
It looks like the issue is we can't use
`assertTrue(monitor.expireMessages(maxTTLSeconds));`
It doesn't mean there are messages is expired. It will return true if there
is no in-progress expiration task.
Maybe we can try to check the backlog of the
```
assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -100,6 +105,27 @@ public boolean expireMessages(int messageTTLInSeconds) {
}
}
+ private boolean checkExpiryByLedgerClosureTime(ManagedCursor cursor, int
messageTTLInSeconds) {
+ if (cursor instanceof ManagedCursorImpl managedCursor) {
+ ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)
managedCursor.getManagedLedger();
+ Position deletedPosition = managedCursor.getMarkDeletedPosition();
+ SortedMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>
ledgerInfoSortedMap =
+ managedLedger.getLedgersInfo()
+ .subMap(deletedPosition.getLedgerId(),
managedLedger.getLedgersInfo().lastKey());
+ long expiryLedgerId = -1;
+ for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo :
ledgerInfoSortedMap.values()) {
+ if (!MessageImpl.isEntryExpired(messageTTLInSeconds,
ledgerInfo.getTimestamp())) {
+ break;
+ }
+ expiryLedgerId = ledgerInfo.getLedgerId();
+ }
+ if (expiryLedgerId > -1) {
+
findEntryComplete(PositionImpl.get(managedLedger.getNextValidLedger(expiryLedgerId),
0), null);
Review Comment:
```suggestion
findEntryComplete(PositionImpl.get(managedLedger.getNextValidLedger(expiryLedgerId),
0), null);
return true;
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java:
##########
@@ -437,6 +440,28 @@ void
testMessageExpiryWithTimestampNonRecoverableException() throws Exception {
}
+ @Test
+ public void testIncorrectClientClock() throws Exception {
Review Comment:
It looks like the test can't cover the issue?
<img width="1686" alt="image"
src="https://github.com/apache/pulsar/assets/12592133/ca5869e7-4663-460f-9b3a-4e076d84a588">
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]