This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 354a0592c1e9c559ea6ba0f171d9fb4d5213f33c
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Fri Feb 7 14:31:44 2020 +0900

    Fix bug that backlog message that has not yet expired could be deleted due 
to TTL (#6211)
    
    Fixes #5579
    
    ### Motivation
    
    In Pulsar 2.4.1 and later versions, if message TTL is enabled, 
`PersistentMessageExpiryMonitor` always deletes one non-expired message every 5 
minutes.
    
    The cause of this bug is https://github.com/apache/pulsar/pull/4744. 
`PersistentMessageExpiryMonitor` expects 
`ManagedCursor#asyncFindNewestMatching()` to pass null as its found position to 
itself as a callback if no expired messages exist.
    
https://github.com/apache/pulsar/blob/c5ba52983fee994de61984aae7d1757e9b738caf/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java#L119-L130
    
    However, due to the change in https://github.com/apache/pulsar/pull/4744, 
if no entry is found that matches the search condition, the callback will be 
passed `startPosition` instead of null now. For this reason, the earliest 
backlog message is always deleted by `PersistentMessageExpiryMonitor`.
    
    This means that unexpected message loss can occur.
    
    ### Modifications
    
    Revert the https://github.com/apache/pulsar/pull/4744 changes. The 
motivation of https://github.com/apache/pulsar/pull/4744 is to avoid NPE caused 
in pulse-sql, but that seems to be fixed in 
https://github.com/apache/pulsar/pull/4757.
    
https://github.com/apache/pulsar/blob/2069f761753940ed6a1faca8999af70036f20fd6/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java#L363-L382
---
 .../main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java  | 5 ++++-
 .../java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java  | 6 +++---
 .../apache/pulsar/broker/service/PersistentMessageFinderTest.java   | 2 +-
 3 files changed, 8 insertions(+), 5 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
index 4bce569..861d247 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java
@@ -67,7 +67,10 @@ class OpFindNewest implements ReadEntryCallback {
         switch (state) {
         case checkFirst:
             if (!condition.apply(entry)) {
-                callback.findEntryComplete(startPosition, 
OpFindNewest.this.ctx);
+                // If no entry is found that matches the condition, it is 
expected to pass null to the callback.
+                // Otherwise, a message before the expiration date will be 
deleted due to message TTL.
+                // cf. https://github.com/apache/pulsar/issues/5579
+                callback.findEntryComplete(null, OpFindNewest.this.ctx);
                 return;
             } else {
                 lastMatchedPosition = position;
diff --git 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index ddb8da4..a02b365 100644
--- 
a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ 
b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -1725,7 +1725,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         ledger.addEntry("not-expired".getBytes(Encoding));
         ledger.addEntry("not-expired".getBytes(Encoding));
 
-        assertEquals(c1.readPosition,
+        assertNull(
                 c1.findNewestMatching(entry -> 
Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))));
     }
 
@@ -2188,7 +2188,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
         ManagedCursorImpl c1 = (ManagedCursorImpl) 
ledger.openCursor(ledgerAndCursorName);
 
-        Position firstPosition = 
ledger.addEntry(getEntryPublishTime("retained1"));
+        ledger.addEntry(getEntryPublishTime("retained1"));
         // space apart message publish times
         Thread.sleep(100);
         ledger.addEntry(getEntryPublishTime("retained2"));
@@ -2217,7 +2217,7 @@ public class ManagedCursorTest extends 
MockedBookKeeperTestCase {
         assertEquals(found.getEntryId(), expectedEntryId);
 
         found = (PositionImpl) findPositionFromAllEntries(c1, 0);
-        assertEquals(found, firstPosition);
+        assertNull(found);
     }
 
     @Test(timeOut = 20000)
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index ab0da31..a393ae3 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -162,7 +162,7 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         future = findMessage(result, c1, beginTimestamp);
         future.get();
         assertNull(result.exception);
-        assertEquals(result.position, c1.getFirstPosition());
+        assertNull(result.position);
 
         result.reset();
         future = findMessage(result, c1, endTimestamp);

Reply via email to