codelipenghui commented on code in PR #21940:
URL: https://github.com/apache/pulsar/pull/21940#discussion_r1467418275


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -100,6 +105,27 @@ public boolean expireMessages(int messageTTLInSeconds) {
         }
     }
 
+    private boolean checkExpiryByLedgerClosureTime(ManagedCursor cursor, int 
messageTTLInSeconds) {
+        if (cursor instanceof ManagedCursorImpl managedCursor) {
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
managedCursor.getManagedLedger();
+            Position deletedPosition = managedCursor.getMarkDeletedPosition();
+            SortedMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> 
ledgerInfoSortedMap =
+                    managedLedger.getLedgersInfo()
+                            .subMap(deletedPosition.getLedgerId(), 
managedLedger.getLedgersInfo().lastKey());
+            long expiryLedgerId = -1;
+            for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : 
ledgerInfoSortedMap.values()) {
+                if (!MessageImpl.isEntryExpired(messageTTLInSeconds, 
ledgerInfo.getTimestamp())) {
+                    break;
+                }
+                expiryLedgerId = ledgerInfo.getLedgerId();

Review Comment:
   ```suggestion
                   if (!ledgerInfo.hasTimestamp()) {
                       break;
                   }
                   if (!MessageImpl.isEntryExpired(messageTTLInSeconds, 
ledgerInfo.getTimestamp())) {
                       break;
                   }
                   expiryLedgerId = ledgerInfo.getLedgerId();
   ```
   
   Is it better to add a check to make sure the ledger has timestamp to avoid a 
potential NPE?
   I know it shouldn't happen because the loop will not touch the last ledger.



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java:
##########
@@ -437,6 +440,28 @@ void 
testMessageExpiryWithTimestampNonRecoverableException() throws Exception {
 
     }
 
+    @Test
+    public void testIncorrectClientClock() throws Exception {
+        final String ledgerAndCursorName = "testIncorrectClientClock";
+        int maxRollOverTimeMs = 1;
+        int maxTTLSeconds = 1;
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaximumRolloverTime(maxRollOverTimeMs, 
TimeUnit.MICROSECONDS);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open(ledgerAndCursorName, config);
+        ManagedCursorImpl c1 = (ManagedCursorImpl) 
ledger.openCursor(ledgerAndCursorName);
+        // set client clock to 10 days later
+        long incorrectPublishTimestamp = System.currentTimeMillis() + 
TimeUnit.DAYS.toMillis(10);
+        for (int i = 0; i < 10; i++) {
+            ledger.addEntry(createMessageWrittenToLedger("msg" + i, 
incorrectPublishTimestamp));
+        }
+        PersistentTopic mock = mock(PersistentTopic.class);
+        when(mock.getName()).thenReturn("topicname");
+        when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
+        PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
+        Thread.sleep(Math.max(config.getMaximumRolloverTimeMs(), 
TimeUnit.SECONDS.toMillis(maxTTLSeconds)));
+        assertTrue(monitor.expireMessages(maxTTLSeconds));

Review Comment:
   ```suggestion
           assertTrue(monitor.expireMessages(maxTTLSeconds));
           assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java:
##########
@@ -437,6 +440,28 @@ void 
testMessageExpiryWithTimestampNonRecoverableException() throws Exception {
 
     }
 
+    @Test
+    public void testIncorrectClientClock() throws Exception {

Review Comment:
   It looks like the issue is we can't use 
`assertTrue(monitor.expireMessages(maxTTLSeconds));`
   It doesn't mean there are messages is expired. It will return true if there 
is no in-progress expiration task.
   
   Maybe we can try to check the backlog of the 
   
   ```
   assertEquals(c1.getNumberOfEntriesInBacklog(true), 0);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -100,6 +105,27 @@ public boolean expireMessages(int messageTTLInSeconds) {
         }
     }
 
+    private boolean checkExpiryByLedgerClosureTime(ManagedCursor cursor, int 
messageTTLInSeconds) {
+        if (cursor instanceof ManagedCursorImpl managedCursor) {
+            ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) 
managedCursor.getManagedLedger();
+            Position deletedPosition = managedCursor.getMarkDeletedPosition();
+            SortedMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> 
ledgerInfoSortedMap =
+                    managedLedger.getLedgersInfo()
+                            .subMap(deletedPosition.getLedgerId(), 
managedLedger.getLedgersInfo().lastKey());
+            long expiryLedgerId = -1;
+            for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo : 
ledgerInfoSortedMap.values()) {
+                if (!MessageImpl.isEntryExpired(messageTTLInSeconds, 
ledgerInfo.getTimestamp())) {
+                    break;
+                }
+                expiryLedgerId = ledgerInfo.getLedgerId();
+            }
+            if (expiryLedgerId > -1) {
+                
findEntryComplete(PositionImpl.get(managedLedger.getNextValidLedger(expiryLedgerId),
 0), null);

Review Comment:
   ```suggestion
                   
findEntryComplete(PositionImpl.get(managedLedger.getNextValidLedger(expiryLedgerId),
 0), null);
   return true;
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java:
##########
@@ -437,6 +440,28 @@ void 
testMessageExpiryWithTimestampNonRecoverableException() throws Exception {
 
     }
 
+    @Test
+    public void testIncorrectClientClock() throws Exception {

Review Comment:
   It looks like the test can't cover the issue?
   
   <img width="1686" alt="image" 
src="https://github.com/apache/pulsar/assets/12592133/ca5869e7-4663-460f-9b3a-4e076d84a588";>
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to