This is an automated email from the ASF dual-hosted git repository. technoboy pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 47b73076e13ee5a8b249dccb1362ccee2c6d9b0d Author: fengyubiao <yubiao.f...@streamnative.io> AuthorDate: Thu Aug 7 19:25:24 2025 +0800 [improve][broker]Improve the anti-concurrency mechanism expirationCheckInProgress (#24607) --- .../persistent/PersistentMessageExpiryMonitor.java | 4 +- .../service/PersistentMessageFinderTest.java | 30 ++--- .../PersistentMessageExpiryMonitorTest.java | 141 +++++++++++++++++++++ 3 files changed, 158 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index f6ba8a2f9a2..a5ee05db767 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -194,6 +194,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag if (subscription != null && subscription.getType() == SubType.Key_Shared) { subscription.getDispatcher().markDeletePositionMoveForward(); } + expirationCheckInProgress = FALSE; if (log.isDebugEnabled()) { log.debug("[{}][{}] Mark deleted {} messages", topicName, subName, numMessagesExpired); } @@ -202,6 +203,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { log.warn("[{}][{}] Message expiry failed - mark delete failed", topicName, subName, exception); + expirationCheckInProgress = FALSE; updateRates(); } }; @@ -220,9 +222,9 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag if (log.isDebugEnabled()) { log.debug("[{}][{}] No messages to expire", topicName, subName); } + expirationCheckInProgress = FALSE; updateRates(); } - expirationCheckInProgress = FALSE; } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 4e0eee7f0b7..5a995c63df5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -19,12 +19,9 @@ package org.apache.pulsar.broker.service; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -43,6 +40,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.ws.rs.client.Entity; import javax.ws.rs.core.MediaType; @@ -691,18 +689,24 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { } when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1)); - PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor(topic, - cursor.getName(), cursor, subscription)); + AtomicInteger counterCalledFindEntryComplete = new AtomicInteger(0); + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(topic, + cursor.getName(), cursor, subscription) { + @Override + public void findEntryComplete(Position position, Object ctx) { + super.findEntryComplete(position, ctx); + counterCalledFindEntryComplete.incrementAndGet(); + } + }; assertEquals(cursor.getMarkDeletedPosition(), PositionFactory.create(positions.get(0).getLedgerId(), -1)); boolean issued; // Expire by position and verify mark delete position of cursor. issued = monitor.expireMessages(positions.get(15)); - Awaitility.await().untilAsserted(() -> verify(monitor, times(1)).findEntryComplete(any(), any())); + Awaitility.await().untilAsserted(() -> assertEquals(counterCalledFindEntryComplete.get(), 1)); assertEquals(cursor.getMarkDeletedPosition(), PositionFactory.create(positions.get(15).getLedgerId(), positions.get(15).getEntryId())); assertTrue(issued); - clearInvocations(monitor); // Expire by position beyond last position and nothing should happen. issued = monitor.expireMessages(PositionFactory.create(100, 100)); @@ -712,32 +716,26 @@ public class PersistentMessageFinderTest extends MockedBookKeeperTestCase { // Expire by position again and verify mark delete position of cursor didn't change. issued = monitor.expireMessages(positions.get(15)); - Awaitility.await().untilAsserted(() -> verify(monitor, - times(1)).findEntryComplete(any(), any())); + Awaitility.await().untilAsserted(() -> assertEquals(counterCalledFindEntryComplete.get(), 2)); assertEquals(cursor.getMarkDeletedPosition(), PositionFactory.create(positions.get(15).getLedgerId(), positions.get(15).getEntryId())); assertTrue(issued); - clearInvocations(monitor); // Expire by position before current mark delete position and // verify mark delete position of cursor didn't change. issued = monitor.expireMessages(positions.get(10)); - Awaitility.await().untilAsserted(() -> verify(monitor, - times(1)).findEntryComplete(any(), any())); + Awaitility.await().untilAsserted(() -> assertEquals(counterCalledFindEntryComplete.get(), 3)); assertEquals(cursor.getMarkDeletedPosition(), PositionFactory.create(positions.get(15).getLedgerId(), positions.get(15).getEntryId())); assertTrue(issued); - clearInvocations(monitor); // Expire by position after current mark delete position and // verify mark delete position of cursor move to new position. issued = monitor.expireMessages(positions.get(16)); - Awaitility.await().untilAsserted(() -> verify(monitor, - times(1)).findEntryComplete(any(), any())); + Awaitility.await().untilAsserted(() -> assertEquals(counterCalledFindEntryComplete.get(), 4)); assertEquals(cursor.getMarkDeletedPosition(), PositionFactory.create(positions.get(16).getLedgerId(), positions.get(16).getEntryId())); assertTrue(issued); - clearInvocations(monitor); ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class); PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor(topic, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitorTest.java new file mode 100644 index 00000000000..5535561a5fa --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitorTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; +import static org.testng.AssertJUnit.assertEquals; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.awaitility.Awaitility; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +public class PersistentMessageExpiryMonitorTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + /*** + * Confirm the anti-concurrency mechanism "expirationCheckInProgressUpdater" works. + */ + @Test + void testConcurrentlyExpireMessages() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String cursorName = "s1"; + admin.topics().createNonPartitionedTopic(topicName); + admin.topics().createSubscriptionAsync(topicName, cursorName, MessageId.earliest); + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + producer.send("1"); + producer.send("2"); + producer.send("3"); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(cursorName); + ManagedCursorImpl spyCursor = Mockito.spy(cursor); + + // Make the mark-deleting delay. + CountDownLatch firstFindingCompleted = new CountDownLatch(1); + AtomicInteger calledFindPositionCount = new AtomicInteger(); + doAnswer(invocationOnMock -> { + firstFindingCompleted.countDown(); + ml.getExecutor().execute(() -> { + try { + Thread.sleep(3000); + invocationOnMock.callRealMethod(); + } catch (Throwable ex) { + log.error("Unexpected exception when calling mark delete", ex); + } + }); + return true; + }).when(spyCursor).asyncMarkDelete(any(Position.class), any(Map.class), + any(AsyncCallbacks.MarkDeleteCallback.class), any()); + doAnswer(invocationOnMock -> { + calledFindPositionCount.incrementAndGet(); + return invocationOnMock.callRealMethod(); + }).when(spyCursor).asyncFindNewestMatching(any(), any(), any(), any(), any(), any(), anyBoolean()); + doAnswer(invocationOnMock -> { + calledFindPositionCount.incrementAndGet(); + return invocationOnMock.callRealMethod(); + }).when(spyCursor).asyncFindNewestMatching(any(), any(), any(), any(), anyBoolean()); + doAnswer(invocationOnMock -> { + calledFindPositionCount.incrementAndGet(); + return invocationOnMock.callRealMethod(); + }).when(spyCursor).asyncFindNewestMatching(any(), any(), any(), any()); + + // Sleep 2s to make "find(1s)" get a position. + Thread.sleep(2000); + + // Start two expire tasks concurrently. + PersistentMessageExpiryMonitor + monitor = new PersistentMessageExpiryMonitor(persistentTopic, cursorName, spyCursor, null); + CompletableFuture<Boolean> expireTask1 = new CompletableFuture<>(); + new Thread(() -> { + expireTask1.complete(monitor.expireMessages(1)); + }).start(); + CompletableFuture<Boolean> expireTask2 = new CompletableFuture<>(); + new Thread(() -> { + expireTask2.complete(monitor.expireMessages(1)); + }).start(); + firstFindingCompleted.await(); + CompletableFuture<Boolean> expireTask3 = new CompletableFuture<>(); + new Thread(() -> { + expireTask3.complete(monitor.expireMessages(1)); + }).start(); + + Awaitility.await().untilAsserted(() -> { + Assert.assertEquals(monitor.getTotalMessageExpired(), 3); + }); + // Verify: since the other 2 tasks have been prevented, the count of calling find position is 1. + Thread.sleep(1000); + assertEquals(1, calledFindPositionCount.get()); + + // cleanup. + producer.close(); + admin.topics().delete(topicName); + } +}