This is an automated email from the ASF dual-hosted git repository. guangning pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 354a0592c1e9c559ea6ba0f171d9fb4d5213f33c Author: Masahiro Sakamoto <[email protected]> AuthorDate: Fri Feb 7 14:31:44 2020 +0900 Fix bug that backlog message that has not yet expired could be deleted due to TTL (#6211) Fixes #5579 ### Motivation In Pulsar 2.4.1 and later versions, if message TTL is enabled, `PersistentMessageExpiryMonitor` always deletes one non-expired message every 5 minutes. The cause of this bug is https://github.com/apache/pulsar/pull/4744. `PersistentMessageExpiryMonitor` expects `ManagedCursor#asyncFindNewestMatching()` to pass null as its found position to itself as a callback if no expired messages exist. https://github.com/apache/pulsar/blob/c5ba52983fee994de61984aae7d1757e9b738caf/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java#L119-L130 However, due to the change in https://github.com/apache/pulsar/pull/4744, if no entry is found that matches the search condition, the callback will be passed `startPosition` instead of null now. For this reason, the earliest backlog message is always deleted by `PersistentMessageExpiryMonitor`. This means that unexpected message loss can occur. ### Modifications Revert the https://github.com/apache/pulsar/pull/4744 changes. The motivation of https://github.com/apache/pulsar/pull/4744 is to avoid NPE caused in pulse-sql, but that seems to be fixed in https://github.com/apache/pulsar/pull/4757. https://github.com/apache/pulsar/blob/2069f761753940ed6a1faca8999af70036f20fd6/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java#L363-L382 --- .../main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java | 5 ++++- .../java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java | 6 +++--- .../apache/pulsar/broker/service/PersistentMessageFinderTest.java | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java index 4bce569..861d247 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java @@ -67,7 +67,10 @@ class OpFindNewest implements ReadEntryCallback { switch (state) { case checkFirst: if (!condition.apply(entry)) { - callback.findEntryComplete(startPosition, OpFindNewest.this.ctx); + // If no entry is found that matches the condition, it is expected to pass null to the callback. + // Otherwise, a message before the expiration date will be deleted due to message TTL. + // cf. https://github.com/apache/pulsar/issues/5579 + callback.findEntryComplete(null, OpFindNewest.this.ctx); return; } else { lastMatchedPosition = position; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index ddb8da4..a02b365 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -1725,7 +1725,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { ledger.addEntry("not-expired".getBytes(Encoding)); ledger.addEntry("not-expired".getBytes(Encoding)); - assertEquals(c1.readPosition, + assertNull( c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding)))); } @@ -2188,7 +2188,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { ManagedLedger ledger = factory.open(ledgerAndCursorName, config); ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); - Position firstPosition = ledger.addEntry(getEntryPublishTime("retained1")); + ledger.addEntry(getEntryPublishTime("retained1")); // space apart message publish times Thread.sleep(100); ledger.addEntry(getEntryPublishTime("retained2")); @@ -2217,7 +2217,7 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase { assertEquals(found.getEntryId(), expectedEntryId); found = (PositionImpl) findPositionFromAllEntries(c1, 0); - assertEquals(found, firstPosition); + assertNull(found); } @Test(timeOut = 20000) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index ab0da31..a393ae3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -162,7 +162,7 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { future = findMessage(result, c1, beginTimestamp); future.get(); assertNull(result.exception); - assertEquals(result.position, c1.getFirstPosition()); + assertNull(result.position); result.reset(); future = findMessage(result, c1, endTimestamp);
