This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 40631f905303ca54967a987c1fdd070e06cb85d7
Author: 道君 <[email protected]>
AuthorDate: Sat Aug 17 19:50:29 2024 +0800

    [fix][broker] Skip reading entries from closed cursor. (#22751)
    
    (cherry picked from commit aa8226f45e3b28a14377f9f949d5a34f61b27e9a)
---
 .../PersistentDispatcherMultipleConsumers.java     |  26 ++++-
 .../PersistentDispatcherSingleActiveConsumer.java  |  26 ++++-
 .../PersistentDispatcherMultipleConsumersTest.java |  71 ++++++++++++
 ...rsistentDispatcherSingleActiveConsumerTest.java | 127 +++++++++++++++++++++
 4 files changed, 244 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 01728f94be7..6e294cc7db1 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.persistent;
 
 import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 import java.util.ArrayList;
@@ -299,6 +300,12 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
     }
 
     public synchronized void readMoreEntries() {
+        if (cursor.isClosed()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Cursor is already closed, skipping read more 
entries.", cursor.getName());
+            }
+            return;
+        }
         if (isSendInProgress()) {
             // we cannot read more entries while sending the previous batch
             // otherwise we could re-read the same entries and send duplicates
@@ -895,7 +902,14 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         ReadType readType = (ReadType) ctx;
         long waitTimeMillis = readFailureBackoff.next();
 
-        if (exception instanceof NoMoreEntriesToReadException) {
+        // Do not keep reading more entries if the cursor is already closed.
+        if (exception instanceof 
ManagedLedgerException.CursorAlreadyClosedException) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Cursor is already closed, skipping read more 
entries", cursor.getName());
+            }
+            // Set the wait time to -1 to avoid rescheduling the read.
+            waitTimeMillis = -1;
+        } else if (exception instanceof NoMoreEntriesToReadException) {
             if (cursor.getNumberOfEntriesInBacklog(false) == 0) {
                 // Topic has been terminated and there are no more entries to 
read
                 // Notify the consumer only if all the messages were already 
acknowledged
@@ -934,7 +948,14 @@ public class PersistentDispatcherMultipleConsumers extends 
AbstractDispatcherMul
         }
 
         readBatchSize = serviceConfig.getDispatcherMinReadBatchSize();
+        // Skip read if the waitTimeMillis is a nagetive value.
+        if (waitTimeMillis >= 0) {
+            scheduleReadEntriesWithDelay(exception, readType, waitTimeMillis);
+        }
+    }
 
+    @VisibleForTesting
+    void scheduleReadEntriesWithDelay(Exception e, ReadType readType, long 
waitTimeMillis) {
         topic.getBrokerService().executor().schedule(() -> {
             synchronized (PersistentDispatcherMultipleConsumers.this) {
                 // If it's a replay read we need to retry even if there's 
already
@@ -944,11 +965,10 @@ public class PersistentDispatcherMultipleConsumers 
extends AbstractDispatcherMul
                     log.info("[{}] Retrying read operation", name);
                     readMoreEntries();
                 } else {
-                    log.info("[{}] Skipping read retry: havePendingRead {}", 
name, havePendingRead, exception);
+                    log.info("[{}] Skipping read retry: havePendingRead {}", 
name, havePendingRead, e);
                 }
             }
         }, waitTimeMillis, TimeUnit.MILLISECONDS);
-
     }
 
     private boolean needTrimAckedMessages() {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index adaa5a66a0c..d236b5b1db0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service.persistent;
 
 import static 
org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
 import static 
org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
+import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.Recycler;
 import java.util.Iterator;
 import java.util.List;
@@ -313,7 +314,14 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         redeliverUnacknowledgedMessages(consumer, DEFAULT_CONSUMER_EPOCH);
     }
 
-    private void readMoreEntries(Consumer consumer) {
+    @VisibleForTesting
+    void readMoreEntries(Consumer consumer) {
+        if (cursor.isClosed()) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Cursor is already closed, skipping read more 
entries", cursor.getName());
+            }
+            return;
+        }
         // consumer can be null when all consumers are disconnected from 
broker.
         // so skip reading more entries if currently there is no active 
consumer.
         if (null == consumer) {
@@ -499,6 +507,14 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         Consumer c = readEntriesCtx.getConsumer();
         readEntriesCtx.recycle();
 
+        // Do not keep reading messages from a closed cursor.
+        if (exception instanceof 
ManagedLedgerException.CursorAlreadyClosedException) {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Cursor was already closed, skipping read more 
entries", cursor.getName());
+            }
+            return;
+        }
+
         if (exception instanceof ConcurrentWaitCallbackException) {
             // At most one pending read request is allowed when there are no 
more entries, we should not trigger more
             // read operations in this case and just wait the existing read 
operation completes.
@@ -535,6 +551,11 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
         // Reduce read batch size to avoid flooding bookies with retries
         readBatchSize = serviceConfig.getDispatcherMinReadBatchSize();
 
+        scheduleReadEntriesWithDelay(c, waitTimeMillis);
+    }
+
+    @VisibleForTesting
+    void scheduleReadEntriesWithDelay(Consumer c, long delay) {
         topic.getBrokerService().executor().schedule(() -> {
 
             // Jump again into dispatcher dedicated thread
@@ -556,8 +577,7 @@ public class PersistentDispatcherSingleActiveConsumer 
extends AbstractDispatcher
                     }
                 }
             });
-        }, waitTimeMillis, TimeUnit.MILLISECONDS);
-
+        }, delay, TimeUnit.MILLISECONDS);
     }
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
index f24c5c5933e..a03ed92b815 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersTest.java
@@ -20,15 +20,24 @@ package org.apache.pulsar.broker.service.persistent;
 
 import com.carrotsearch.hppc.ObjectSet;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.service.Dispatcher;
+import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.awaitility.reflect.WhiteboxImpl;
+import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -98,4 +107,66 @@ public class PersistentDispatcherMultipleConsumersTest 
extends ProducerConsumerB
         consumer.close();
         admin.topics().delete(topicName, false);
     }
+
+    @Test
+    public void testSkipReadEntriesFromCloseCursor() throws Exception {
+        final String topicName =
+                
BrokerTestUtil.newUniqueName("persistent://public/default/testSkipReadEntriesFromCloseCursor");
+        final String subscription = "s1";
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        @Cleanup
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        for (int i = 0; i < 10; i++) {
+            producer.send("message-" + i);
+        }
+        producer.close();
+
+        // Get the dispatcher of the topic.
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService()
+                .getTopic(topicName, false).join().get();
+
+        ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class);
+        Mockito.doReturn(subscription).when(cursor).getName();
+        Subscription sub = Mockito.mock(PersistentSubscription.class);
+        Mockito.doReturn(topic).when(sub).getTopic();
+        // Mock the dispatcher.
+        PersistentDispatcherMultipleConsumers dispatcher =
+                Mockito.spy(new PersistentDispatcherMultipleConsumers(topic, 
cursor, sub));
+        // Return 10 permits to make the dispatcher can read more entries.
+        
Mockito.doReturn(10).when(dispatcher).getFirstAvailableConsumerPermits();
+
+        // Make the count + 1 when call the scheduleReadEntriesWithDelay(...).
+        AtomicInteger callScheduleReadEntriesWithDelayCnt = new 
AtomicInteger(0);
+        Mockito.doAnswer(inv -> {
+            callScheduleReadEntriesWithDelayCnt.getAndIncrement();
+            return inv.callRealMethod();
+        }).when(dispatcher).scheduleReadEntriesWithDelay(Mockito.any(), 
Mockito.any(), Mockito.anyLong());
+
+        // Make the count + 1 when call the readEntriesFailed(...).
+        AtomicInteger callReadEntriesFailed = new AtomicInteger(0);
+        Mockito.doAnswer(inv -> {
+            callReadEntriesFailed.getAndIncrement();
+            return inv.callRealMethod();
+        }).when(dispatcher).readEntriesFailed(Mockito.any(), Mockito.any());
+
+        Mockito.doReturn(false).when(cursor).isClosed();
+
+        // Mock the readEntriesOrWait(...) to simulate the cursor is closed.
+        Mockito.doAnswer(inv -> {
+            PersistentDispatcherMultipleConsumers dispatcher1 = 
inv.getArgument(2);
+            dispatcher1.readEntriesFailed(new 
ManagedLedgerException.CursorAlreadyClosedException("cursor closed"),
+                    null);
+            return null;
+        }).when(cursor).asyncReadEntriesOrWait(Mockito.anyInt(), 
Mockito.anyLong(), Mockito.eq(dispatcher),
+                Mockito.any(), Mockito.any());
+
+        dispatcher.readMoreEntries();
+
+        // Verify: the readEntriesFailed should be called once and the 
scheduleReadEntriesWithDelay should not be called.
+        Assert.assertTrue(callReadEntriesFailed.get() == 1 && 
callScheduleReadEntriesWithDelayCnt.get() == 0);
+
+        // Verify: the topic can be deleted successfully.
+        admin.topics().delete(topicName, false);
+    }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
new file mode 100644
index 00000000000..a4c9e26ffb8
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumerTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.Consumer;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Slf4j
+@Test(groups = "broker-api")
+public class PersistentDispatcherSingleActiveConsumerTest extends 
ProducerConsumerBase {
+    @BeforeClass(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testSkipReadEntriesFromCloseCursor() throws Exception {
+        final String topicName =
+                
BrokerTestUtil.newUniqueName("persistent://public/default/testSkipReadEntriesFromCloseCursor");
+        final String subscription = "s1";
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        @Cleanup
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
+        for (int i = 0; i < 10; i++) {
+            producer.send("message-" + i);
+        }
+        producer.close();
+
+        // Get the dispatcher of the topic.
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService()
+                .getTopic(topicName, false).join().get();
+
+        ManagedCursor cursor = Mockito.mock(ManagedCursorImpl.class);
+        Mockito.doReturn(subscription).when(cursor).getName();
+        Subscription sub = Mockito.mock(PersistentSubscription.class);
+        Mockito.doReturn(topic).when(sub).getTopic();
+        // Mock the dispatcher.
+        PersistentDispatcherSingleActiveConsumer dispatcher =
+                Mockito.spy(new 
PersistentDispatcherSingleActiveConsumer(cursor, 
CommandSubscribe.SubType.Exclusive,0, topic, sub));
+
+        // Mock a consumer
+        Consumer consumer = Mockito.mock(Consumer.class);
+        consumer.getAvailablePermits();
+        Mockito.doReturn(10).when(consumer).getAvailablePermits();
+        Mockito.doReturn(10).when(consumer).getAvgMessagesPerEntry();
+        Mockito.doReturn("test").when(consumer).consumerName();
+        Mockito.doReturn(true).when(consumer).isWritable();
+        Mockito.doReturn(false).when(consumer).readCompacted();
+
+        // Make the consumer as the active consumer.
+        Mockito.doReturn(consumer).when(dispatcher).getActiveConsumer();
+
+        // Make the count + 1 when call the scheduleReadEntriesWithDelay(...).
+        AtomicInteger callScheduleReadEntriesWithDelayCnt = new 
AtomicInteger(0);
+        Mockito.doAnswer(inv -> {
+            callScheduleReadEntriesWithDelayCnt.getAndIncrement();
+            return inv.callRealMethod();
+        }).when(dispatcher).scheduleReadEntriesWithDelay(Mockito.eq(consumer), 
Mockito.anyLong());
+
+        // Make the count + 1 when call the readEntriesFailed(...).
+        AtomicInteger callReadEntriesFailed = new AtomicInteger(0);
+        Mockito.doAnswer(inv -> {
+            callReadEntriesFailed.getAndIncrement();
+            return inv.callRealMethod();
+        }).when(dispatcher).readEntriesFailed(Mockito.any(), Mockito.any());
+
+        Mockito.doReturn(false).when(cursor).isClosed();
+
+        // Mock the readEntriesOrWait(...) to simulate the cursor is closed.
+        Mockito.doAnswer(inv -> {
+            PersistentDispatcherSingleActiveConsumer dispatcher1 = 
inv.getArgument(2);
+            dispatcher1.readEntriesFailed(new 
ManagedLedgerException.CursorAlreadyClosedException("cursor closed"),
+                    null);
+            return null;
+        }).when(cursor).asyncReadEntriesOrWait(Mockito.anyInt(), 
Mockito.anyLong(), Mockito.eq(dispatcher),
+                Mockito.any(), Mockito.any());
+
+        dispatcher.readMoreEntries(consumer);
+
+        // Verify: the readEntriesFailed should be called once and the 
scheduleReadEntriesWithDelay should not be called.
+        Assert.assertTrue(callReadEntriesFailed.get() == 1 && 
callScheduleReadEntriesWithDelayCnt.get() == 0);
+
+        // Verify: the topic can be deleted successfully.
+        admin.topics().delete(topicName, false);
+    }
+}

Reply via email to