This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 47b73076e13ee5a8b249dccb1362ccee2c6d9b0d
Author: fengyubiao <yubiao.f...@streamnative.io>
AuthorDate: Thu Aug 7 19:25:24 2025 +0800

    [improve][broker]Improve the anti-concurrency mechanism 
expirationCheckInProgress (#24607)
---
 .../persistent/PersistentMessageExpiryMonitor.java |   4 +-
 .../service/PersistentMessageFinderTest.java       |  30 ++---
 .../PersistentMessageExpiryMonitorTest.java        | 141 +++++++++++++++++++++
 3 files changed, 158 insertions(+), 17 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index f6ba8a2f9a2..a5ee05db767 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -194,6 +194,7 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
             if (subscription != null && subscription.getType() == 
SubType.Key_Shared) {
                 subscription.getDispatcher().markDeletePositionMoveForward();
             }
+            expirationCheckInProgress = FALSE;
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] Mark deleted {} messages", topicName, 
subName, numMessagesExpired);
             }
@@ -202,6 +203,7 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
         @Override
         public void markDeleteFailed(ManagedLedgerException exception, Object 
ctx) {
             log.warn("[{}][{}] Message expiry failed - mark delete failed", 
topicName, subName, exception);
+            expirationCheckInProgress = FALSE;
             updateRates();
         }
     };
@@ -220,9 +222,9 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
             if (log.isDebugEnabled()) {
                 log.debug("[{}][{}] No messages to expire", topicName, 
subName);
             }
+            expirationCheckInProgress = FALSE;
             updateRates();
         }
-        expirationCheckInProgress = FALSE;
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
index 4e0eee7f0b7..5a995c63df5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java
@@ -19,12 +19,9 @@
 package org.apache.pulsar.broker.service;
 
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -43,6 +40,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.core.MediaType;
@@ -691,18 +689,24 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
         }
         
when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1));
 
-        PersistentMessageExpiryMonitor monitor = spy(new 
PersistentMessageExpiryMonitor(topic,
-                cursor.getName(), cursor, subscription));
+        AtomicInteger counterCalledFindEntryComplete = new AtomicInteger(0);
+        PersistentMessageExpiryMonitor monitor = new 
PersistentMessageExpiryMonitor(topic,
+                cursor.getName(), cursor, subscription) {
+            @Override
+            public void findEntryComplete(Position position, Object ctx) {
+                super.findEntryComplete(position, ctx);
+                counterCalledFindEntryComplete.incrementAndGet();
+            }
+        };
         assertEquals(cursor.getMarkDeletedPosition(), 
PositionFactory.create(positions.get(0).getLedgerId(), -1));
         boolean issued;
 
         // Expire by position and verify mark delete position of cursor.
         issued = monitor.expireMessages(positions.get(15));
-        Awaitility.await().untilAsserted(() -> verify(monitor, 
times(1)).findEntryComplete(any(), any()));
+        Awaitility.await().untilAsserted(() -> 
assertEquals(counterCalledFindEntryComplete.get(), 1));
         assertEquals(cursor.getMarkDeletedPosition(), 
PositionFactory.create(positions.get(15).getLedgerId(),
                 positions.get(15).getEntryId()));
         assertTrue(issued);
-        clearInvocations(monitor);
 
         // Expire by position beyond last position and nothing should happen.
         issued = monitor.expireMessages(PositionFactory.create(100, 100));
@@ -712,32 +716,26 @@ public class PersistentMessageFinderTest extends 
MockedBookKeeperTestCase {
 
         // Expire by position again and verify mark delete position of cursor 
didn't change.
         issued = monitor.expireMessages(positions.get(15));
-        Awaitility.await().untilAsserted(() -> verify(monitor,
-                times(1)).findEntryComplete(any(), any()));
+        Awaitility.await().untilAsserted(() -> 
assertEquals(counterCalledFindEntryComplete.get(), 2));
         assertEquals(cursor.getMarkDeletedPosition(), 
PositionFactory.create(positions.get(15).getLedgerId(),
                 positions.get(15).getEntryId()));
         assertTrue(issued);
-        clearInvocations(monitor);
 
         // Expire by position before current mark delete position and
         // verify mark delete position of cursor didn't change.
         issued = monitor.expireMessages(positions.get(10));
-        Awaitility.await().untilAsserted(() -> verify(monitor,
-                times(1)).findEntryComplete(any(), any()));
+        Awaitility.await().untilAsserted(() -> 
assertEquals(counterCalledFindEntryComplete.get(), 3));
         assertEquals(cursor.getMarkDeletedPosition(), 
PositionFactory.create(positions.get(15).getLedgerId(),
                 positions.get(15).getEntryId()));
         assertTrue(issued);
-        clearInvocations(monitor);
 
         // Expire by position after current mark delete position and
         // verify mark delete position of cursor move to new position.
         issued = monitor.expireMessages(positions.get(16));
-        Awaitility.await().untilAsserted(() -> verify(monitor,
-                times(1)).findEntryComplete(any(), any()));
+        Awaitility.await().untilAsserted(() -> 
assertEquals(counterCalledFindEntryComplete.get(), 4));
         assertEquals(cursor.getMarkDeletedPosition(), 
PositionFactory.create(positions.get(16).getLedgerId(),
                 positions.get(16).getEntryId()));
         assertTrue(issued);
-        clearInvocations(monitor);
 
         ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class);
         PersistentMessageExpiryMonitor mockMonitor = spy(new 
PersistentMessageExpiryMonitor(topic,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitorTest.java
new file mode 100644
index 00000000000..5535561a5fa
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitorTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.doAnswer;
+import static org.testng.AssertJUnit.assertEquals;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.awaitility.Awaitility;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class PersistentMessageExpiryMonitorTest extends ProducerConsumerBase {
+
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    /***
+     * Confirm the anti-concurrency mechanism 
"expirationCheckInProgressUpdater" works.
+     */
+    @Test
+    void testConcurrentlyExpireMessages() throws Exception {
+        final String topicName = 
BrokerTestUtil.newUniqueName("persistent://public/default/tp");
+        final String cursorName = "s1";
+        admin.topics().createNonPartitionedTopic(topicName);
+        admin.topics().createSubscriptionAsync(topicName, cursorName, 
MessageId.earliest);
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        producer.send("1");
+        producer.send("2");
+        producer.send("3");
+        PersistentTopic persistentTopic =
+                (PersistentTopic) 
pulsar.getBrokerService().getTopic(topicName, false).join().get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
+        ManagedCursorImpl cursor = (ManagedCursorImpl) 
ml.getCursors().get(cursorName);
+        ManagedCursorImpl spyCursor = Mockito.spy(cursor);
+
+        // Make the mark-deleting delay.
+        CountDownLatch firstFindingCompleted = new CountDownLatch(1);
+        AtomicInteger calledFindPositionCount = new AtomicInteger();
+        doAnswer(invocationOnMock -> {
+            firstFindingCompleted.countDown();
+            ml.getExecutor().execute(() -> {
+                try {
+                    Thread.sleep(3000);
+                    invocationOnMock.callRealMethod();
+                } catch (Throwable ex) {
+                    log.error("Unexpected exception when calling mark delete", 
ex);
+                }
+            });
+            return true;
+        }).when(spyCursor).asyncMarkDelete(any(Position.class), any(Map.class),
+                any(AsyncCallbacks.MarkDeleteCallback.class), any());
+        doAnswer(invocationOnMock -> {
+            calledFindPositionCount.incrementAndGet();
+            return invocationOnMock.callRealMethod();
+        }).when(spyCursor).asyncFindNewestMatching(any(), any(), any(), any(), 
any(), any(), anyBoolean());
+        doAnswer(invocationOnMock -> {
+            calledFindPositionCount.incrementAndGet();
+            return invocationOnMock.callRealMethod();
+        }).when(spyCursor).asyncFindNewestMatching(any(), any(), any(), any(), 
anyBoolean());
+        doAnswer(invocationOnMock -> {
+            calledFindPositionCount.incrementAndGet();
+            return invocationOnMock.callRealMethod();
+        }).when(spyCursor).asyncFindNewestMatching(any(), any(), any(), any());
+
+        // Sleep 2s to make "find(1s)" get a position.
+        Thread.sleep(2000);
+
+        // Start two expire tasks concurrently.
+        PersistentMessageExpiryMonitor
+                monitor = new PersistentMessageExpiryMonitor(persistentTopic, 
cursorName, spyCursor, null);
+        CompletableFuture<Boolean> expireTask1 = new CompletableFuture<>();
+        new Thread(() -> {
+            expireTask1.complete(monitor.expireMessages(1));
+        }).start();
+        CompletableFuture<Boolean> expireTask2 = new CompletableFuture<>();
+        new Thread(() -> {
+            expireTask2.complete(monitor.expireMessages(1));
+        }).start();
+        firstFindingCompleted.await();
+        CompletableFuture<Boolean> expireTask3 = new CompletableFuture<>();
+        new Thread(() -> {
+            expireTask3.complete(monitor.expireMessages(1));
+        }).start();
+
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(monitor.getTotalMessageExpired(), 3);
+        });
+        // Verify: since the other 2 tasks have been prevented, the count of 
calling find position is 1.
+        Thread.sleep(1000);
+        assertEquals(1, calledFindPositionCount.get());
+
+        // cleanup.
+        producer.close();
+        admin.topics().delete(topicName);
+    }
+}

Reply via email to