This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 40631f905303ca54967a987c1fdd070e06cb85d7 Author: 道君 <[email protected]> AuthorDate: Sat Aug 17 19:50:29 2024 +0800 [fix][broker] Skip reading entries from closed cursor. (#22751) (cherry picked from commit aa8226f45e3b28a14377f9f949d5a34f61b27e9a) --- .../PersistentDispatcherMultipleConsumers.java | 26 ++++- .../PersistentDispatcherSingleActiveConsumer.java | 26 ++++- .../PersistentDispatcherMultipleConsumersTest.java | 71 ++++++++++++ ...rsistentDispatcherSingleActiveConsumerTest.java | 127 +++++++++++++++++++++ 4 files changed, 244 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 01728f94be7..6e294cc7db1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service.persistent; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Range; import java.util.ArrayList; @@ -299,6 +300,12 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } public synchronized void readMoreEntries() { + if (cursor.isClosed()) { + if (log.isDebugEnabled()) { + log.debug("[{}] Cursor is already closed, skipping read more entries.", cursor.getName()); + } + return; + } if (isSendInProgress()) { // we cannot read more entries while sending the previous batch // otherwise we could re-read the same entries and send duplicates @@ -895,7 +902,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul ReadType readType = (ReadType) ctx; long waitTimeMillis = readFailureBackoff.next(); - if (exception instanceof NoMoreEntriesToReadException) { + // Do not keep reading more entries if the cursor is already closed. + if (exception instanceof ManagedLedgerException.CursorAlreadyClosedException) { + if (log.isDebugEnabled()) { + log.debug("[{}] Cursor is already closed, skipping read more entries", cursor.getName()); + } + // Set the wait time to -1 to avoid rescheduling the read. + waitTimeMillis = -1; + } else if (exception instanceof NoMoreEntriesToReadException) { if (cursor.getNumberOfEntriesInBacklog(false) == 0) { // Topic has been terminated and there are no more entries to read // Notify the consumer only if all the messages were already acknowledged @@ -934,7 +948,14 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } readBatchSize = serviceConfig.getDispatcherMinReadBatchSize(); + // Skip read if the waitTimeMillis is a nagetive value. + if (waitTimeMillis >= 0) { + scheduleReadEntriesWithDelay(exception, readType, waitTimeMillis); + } + } + @VisibleForTesting + void scheduleReadEntriesWithDelay(Exception e, ReadType readType, long waitTimeMillis) { topic.getBrokerService().executor().schedule(() -> { synchronized (PersistentDispatcherMultipleConsumers.this) { // If it's a replay read we need to retry even if there's already @@ -944,11 +965,10 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul log.info("[{}] Retrying read operation", name); readMoreEntries(); } else { - log.info("[{}] Skipping read retry: havePendingRead {}", name, havePendingRead, exception); + log.info("[{}] Skipping read retry: havePendingRead {}", name, havePendingRead, e); } } }, waitTimeMillis, TimeUnit.MILLISECONDS); - } private boolean needTrimAckedMessages() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index adaa5a66a0c..d236b5b1db0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service.persistent; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; +import com.google.common.annotations.VisibleForTesting; import io.netty.util.Recycler; import java.util.Iterator; import java.util.List; @@ -313,7 +314,14 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher redeliverUnacknowledgedMessages(consumer, DEFAULT_CONSUMER_EPOCH); } - private void readMoreEntries(Consumer consumer) { + @VisibleForTesting + void readMoreEntries(Consumer consumer) { + if (cursor.isClosed()) { + if (log.isDebugEnabled()) { + log.debug("[{}] Cursor is already closed, skipping read more entries", cursor.getName()); + } + return; + } // consumer can be null when all consumers are disconnected from broker. // so skip reading more entries if currently there is no active consumer. if (null == consumer) { @@ -499,6 +507,14 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher Consumer c = readEntriesCtx.getConsumer(); readEntriesCtx.recycle(); + // Do not keep reading messages from a closed cursor. + if (exception instanceof ManagedLedgerException.CursorAlreadyClosedException) { + if (log.isDebugEnabled()) { + log.debug("[{}] Cursor was already closed, skipping read more entries", cursor.getName()); + } + return; + } + if (exception instanceof ConcurrentWaitCallbackException) { // At most one pending read request is allowed when there are no more entries, we should not trigger more // read operations in this case and just wait the existing read operation completes. @@ -535,6 +551,11 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher // Reduce read batch size to avoid flooding bookies with retries readBatchSize = serviceConfig.getDispatcherMinReadBatchSize(); + scheduleReadEntriesWithDelay(c, waitTimeMillis); + } + + @VisibleForTesting + void scheduleReadEntriesWithDelay(Consumer c, long delay) { topic.getBrokerService().executor().schedule(() -> { // Jump again into dispatcher dedicated thread @@ -556,8 +577,7 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher } } }); - }, waitTimeMillis, TimeUnit.MILLISECONDS); - + }, delay, TimeUnit.MILLISECONDS); } @Override diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java index f24c5c5933e..a03ed92b815 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java @@ -20,15 +20,24 @@ package org.apache.pulsar.broker.service.persistent; import com.carrotsearch.hppc.ObjectSet; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.Dispatcher; +import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.awaitility.reflect.WhiteboxImpl; +import org.mockito.Mockito; +import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -98,4 +107,66 @@ public class PersistentDispatcherMultipleConsumersTest extends ProducerConsumerB consumer.close(); admin.topics().delete(topicName, false); } + + @Test + public void testSkipReadEntriesFromCloseCursor() throws Exception { + final String topicName = + BrokerTestUtil.newUniqueName("persistent://public/default/testSkipReadEntriesFromCloseCursor"); + final String subscription = "s1"; + admin.topics().createNonPartitionedTopic(topicName); + + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + for (int i = 0; i < 10; i++) { + producer.send("message-" + i); + } + producer.close(); + + // Get the dispatcher of the topic. + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService() + .getTopic(topicName, false).join().get(); + + ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class); + Mockito.doReturn(subscription).when(cursor).getName(); + Subscription sub = Mockito.mock(PersistentSubscription.class); + Mockito.doReturn(topic).when(sub).getTopic(); + // Mock the dispatcher. + PersistentDispatcherMultipleConsumers dispatcher = + Mockito.spy(new PersistentDispatcherMultipleConsumers(topic, cursor, sub)); + // Return 10 permits to make the dispatcher can read more entries. + Mockito.doReturn(10).when(dispatcher).getFirstAvailableConsumerPermits(); + + // Make the count + 1 when call the scheduleReadEntriesWithDelay(...). + AtomicInteger callScheduleReadEntriesWithDelayCnt = new AtomicInteger(0); + Mockito.doAnswer(inv -> { + callScheduleReadEntriesWithDelayCnt.getAndIncrement(); + return inv.callRealMethod(); + }).when(dispatcher).scheduleReadEntriesWithDelay(Mockito.any(), Mockito.any(), Mockito.anyLong()); + + // Make the count + 1 when call the readEntriesFailed(...). + AtomicInteger callReadEntriesFailed = new AtomicInteger(0); + Mockito.doAnswer(inv -> { + callReadEntriesFailed.getAndIncrement(); + return inv.callRealMethod(); + }).when(dispatcher).readEntriesFailed(Mockito.any(), Mockito.any()); + + Mockito.doReturn(false).when(cursor).isClosed(); + + // Mock the readEntriesOrWait(...) to simulate the cursor is closed. + Mockito.doAnswer(inv -> { + PersistentDispatcherMultipleConsumers dispatcher1 = inv.getArgument(2); + dispatcher1.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("cursor closed"), + null); + return null; + }).when(cursor).asyncReadEntriesOrWait(Mockito.anyInt(), Mockito.anyLong(), Mockito.eq(dispatcher), + Mockito.any(), Mockito.any()); + + dispatcher.readMoreEntries(); + + // Verify: the readEntriesFailed should be called once and the scheduleReadEntriesWithDelay should not be called. + Assert.assertTrue(callReadEntriesFailed.get() == 1 && callScheduleReadEntriesWithDelayCnt.get() == 0); + + // Verify: the topic can be deleted successfully. + admin.topics().delete(topicName, false); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java new file mode 100644 index 00000000000..a4c9e26ffb8 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.api.proto.CommandSubscribe; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class PersistentDispatcherSingleActiveConsumerTest extends ProducerConsumerBase { + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testSkipReadEntriesFromCloseCursor() throws Exception { + final String topicName = + BrokerTestUtil.newUniqueName("persistent://public/default/testSkipReadEntriesFromCloseCursor"); + final String subscription = "s1"; + admin.topics().createNonPartitionedTopic(topicName); + + @Cleanup + Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + for (int i = 0; i < 10; i++) { + producer.send("message-" + i); + } + producer.close(); + + // Get the dispatcher of the topic. + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService() + .getTopic(topicName, false).join().get(); + + ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class); + Mockito.doReturn(subscription).when(cursor).getName(); + Subscription sub = Mockito.mock(PersistentSubscription.class); + Mockito.doReturn(topic).when(sub).getTopic(); + // Mock the dispatcher. + PersistentDispatcherSingleActiveConsumer dispatcher = + Mockito.spy(new PersistentDispatcherSingleActiveConsumer(cursor, CommandSubscribe.SubType.Exclusive,0, topic, sub)); + + // Mock a consumer + Consumer consumer = Mockito.mock(Consumer.class); + consumer.getAvailablePermits(); + Mockito.doReturn(10).when(consumer).getAvailablePermits(); + Mockito.doReturn(10).when(consumer).getAvgMessagesPerEntry(); + Mockito.doReturn("test").when(consumer).consumerName(); + Mockito.doReturn(true).when(consumer).isWritable(); + Mockito.doReturn(false).when(consumer).readCompacted(); + + // Make the consumer as the active consumer. + Mockito.doReturn(consumer).when(dispatcher).getActiveConsumer(); + + // Make the count + 1 when call the scheduleReadEntriesWithDelay(...). + AtomicInteger callScheduleReadEntriesWithDelayCnt = new AtomicInteger(0); + Mockito.doAnswer(inv -> { + callScheduleReadEntriesWithDelayCnt.getAndIncrement(); + return inv.callRealMethod(); + }).when(dispatcher).scheduleReadEntriesWithDelay(Mockito.eq(consumer), Mockito.anyLong()); + + // Make the count + 1 when call the readEntriesFailed(...). + AtomicInteger callReadEntriesFailed = new AtomicInteger(0); + Mockito.doAnswer(inv -> { + callReadEntriesFailed.getAndIncrement(); + return inv.callRealMethod(); + }).when(dispatcher).readEntriesFailed(Mockito.any(), Mockito.any()); + + Mockito.doReturn(false).when(cursor).isClosed(); + + // Mock the readEntriesOrWait(...) to simulate the cursor is closed. + Mockito.doAnswer(inv -> { + PersistentDispatcherSingleActiveConsumer dispatcher1 = inv.getArgument(2); + dispatcher1.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("cursor closed"), + null); + return null; + }).when(cursor).asyncReadEntriesOrWait(Mockito.anyInt(), Mockito.anyLong(), Mockito.eq(dispatcher), + Mockito.any(), Mockito.any()); + + dispatcher.readMoreEntries(consumer); + + // Verify: the readEntriesFailed should be called once and the scheduleReadEntriesWithDelay should not be called. + Assert.assertTrue(callReadEntriesFailed.get() == 1 && callScheduleReadEntriesWithDelayCnt.get() == 0); + + // Verify: the topic can be deleted successfully. + admin.topics().delete(topicName, false); + } +}
