This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 47eec875a81 [fix][broker] Fix PersistentMessageExpiryMonitor 
findEntryComplete() method may lose mark-delete properties in race condition 
(#25803)
47eec875a81 is described below

commit 47eec875a81d11a030a3781cbb123e273ced5e88
Author: Oneby Wang <[email protected]>
AuthorDate: Mon May 25 23:30:50 2026 +0800

    [fix][broker] Fix PersistentMessageExpiryMonitor findEntryComplete() method 
may lose mark-delete properties in race condition (#25803)
---
 .../persistent/PersistentMessageExpiryMonitor.java |  3 +-
 .../service/PersistentMessageFinderTest.java       | 58 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index a9f7e305104..2848ba2a0d2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -245,8 +245,7 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
                     .attr("position", position)
                     .log("Expiring all messages until position");
             Position prevMarkDeletePos = cursor.getMarkDeletedPosition();
-            cursor.asyncMarkDelete(position, cursor.getProperties(), 
markDeleteCallback,
-                    cursor.getNumberOfEntriesInBacklog(false));
+            cursor.asyncMarkDelete(position, null, markDeleteCallback, 
cursor.getNumberOfEntriesInBacklog(false));
             if (!Objects.equals(cursor.getMarkDeletedPosition(), 
prevMarkDeletePos) && subscription != null) {
                 subscription.updateLastMarkDeleteAdvancedTimestamp();
             }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 8d9a2e45780..5bf4e1f97d4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service;
 
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -34,11 +35,14 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.UnpooledByteBufAllocator;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -1098,4 +1102,58 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         assertNull(range.getRight());
         assertEquals(range.getLeft(), PositionFactory.create(1, 9));
     }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    void testExpireMessagesNeverLoseMarkDeleteProperties() throws Exception {
+        final String ledgerAndCursorName = 
"testExpireMessagesNeverLoseMarkDeleteProperties";
+
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setRetentionSizeInMB(10);
+        config.setRetentionTime(1, TimeUnit.HOURS);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) 
factory.open(ledgerAndCursorName, config);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ledger.openCursor(ledgerAndCursorName);
+        ManagedCursorImpl spyCursor = spy(cursor);
+
+        Position pos1 = ledger.addEntry(createMessageWrittenToLedger("msg-1"));
+        Position pos2 = ledger.addEntry(createMessageWrittenToLedger("msg-2"));
+
+        CountDownLatch expiryMarkDeleteEnteredLatch = new CountDownLatch(1);
+        CountDownLatch cursorMarkDeleteCompletedLatch = new CountDownLatch(1);
+        CountDownLatch expiryMarkDeleteCompletedLatch = new CountDownLatch(1);
+
+        doAnswer(invocation -> {
+            Map<String, Long> invocationProperties = invocation.getArgument(1);
+            // Pause the expiry-triggered mark-delete so the user markDelete() 
can complete first.
+            if (invocationProperties == null || 
invocationProperties.isEmpty()) {
+                expiryMarkDeleteEnteredLatch.countDown();
+                assertTrue(cursorMarkDeleteCompletedLatch.await(5, 
TimeUnit.SECONDS));
+                try {
+                    return invocation.callRealMethod();
+                } finally {
+                    expiryMarkDeleteCompletedLatch.countDown();
+                }
+            }
+
+            return invocation.callRealMethod();
+        }).when(spyCursor)
+                .asyncMarkDelete(any(Position.class), nullable(Map.class), 
any(AsyncCallbacks.MarkDeleteCallback.class),
+                        nullable(Object.class));
+
+        PersistentTopic topic = mockPersistentTopic("topicname");
+        PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor(topic,
+                spyCursor.getName(), spyCursor, null);
+
+        CompletableFuture.runAsync(() -> monitor.findEntryComplete(pos2, 
null));
+        assertTrue(expiryMarkDeleteEnteredLatch.await(5, TimeUnit.SECONDS));
+
+        Map<String, Long> properties = new HashMap<>();
+        properties.put("test-property", 1L);
+        spyCursor.markDelete(pos1, properties);
+        cursorMarkDeleteCompletedLatch.countDown();
+
+        assertTrue(expiryMarkDeleteCompletedLatch.await(5, TimeUnit.SECONDS));
+        assertEquals(spyCursor.getMarkDeletedPosition(), pos2);
+        assertEquals(spyCursor.getProperties(), properties);
+    }
 }

Reply via email to