This is an automated email from the ASF dual-hosted git repository.

yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b2f2b53907e [fix] [broker] Delete topic timeout due to NPE (#21595)
b2f2b53907e is described below

commit b2f2b53907e43d0eb6757bfc4b77bf3db027f251
Author: fengyubiao <[email protected]>
AuthorDate: Tue Nov 21 17:21:51 2023 +0800

    [fix] [broker] Delete topic timeout due to NPE (#21595)
    
    ### Issue:
    There is an NPE that causes the Future of Delay message indexes bucket 
deletion to be no longer complete, which leads to the topic deletion timeout. 
You can reproduce this issue by the test 
`testDeletePartitionedTopicIfCursorPropsEmpty` and 
`testDeleteTopicIfCursorPropsEmpty`
    
    ### Modifications
    Fix the NPE.
---
 .../bookkeeper/mledger/impl/ManagedCursorImpl.java |   2 +-
 .../BucketDelayedDeliveryTrackerFactory.java       |   4 +
 .../bucket/BucketDelayedDeliveryTracker.java       |   7 +-
 .../persistent/BucketDelayedDeliveryTest.java      | 123 +++++++++++++++++++++
 4 files changed, 134 insertions(+), 2 deletions(-)

diff --git 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 39f56a1ad60..4b65d62f0ee 100644
--- 
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ 
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -690,7 +690,7 @@ public class ManagedCursorImpl implements ManagedCursor {
             position = ledger.getLastPosition();
         }
         log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), 
name, position);
-        this.cursorProperties = cursorProperties;
+        this.cursorProperties = cursorProperties == null ? 
Collections.emptyMap() : cursorProperties;
         messagesConsumedCounter = 
-getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition()));
         markDeletePosition = position;
         persistentMarkDeletePosition = position;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
index 157fda8acc6..17d9795dd90 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage;
@@ -85,6 +86,9 @@ public class BucketDelayedDeliveryTrackerFactory implements 
DelayedDeliveryTrack
      */
     public CompletableFuture<Void> cleanResidualSnapshots(ManagedCursor 
cursor) {
         Map<String, String> cursorProperties = cursor.getCursorProperties();
+        if (MapUtils.isEmpty(cursorProperties)) {
+            return CompletableFuture.completedFuture(null);
+        }
         List<CompletableFuture<Void>> futures = new ArrayList<>();
         FutureUtil.Sequencer<Void> sequencer = FutureUtil.Sequencer.create();
         cursorProperties.forEach((k, v) -> {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index 67a7de1f013..d7a3e80f086 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -50,6 +50,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.mutable.MutableLong;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
@@ -137,9 +138,13 @@ public class BucketDelayedDeliveryTracker extends 
AbstractDelayedDeliveryTracker
 
     private synchronized long recoverBucketSnapshot() throws RuntimeException {
         ManagedCursor cursor = this.lastMutableBucket.getCursor();
+        Map<String, String> cursorProperties = cursor.getCursorProperties();
+        if (MapUtils.isEmpty(cursorProperties)) {
+            return 0;
+        }
         FutureUtil.Sequencer<Void> sequencer = 
this.lastMutableBucket.getSequencer();
         Map<Range<Long>, ImmutableBucket> toBeDeletedBucketMap = new 
HashMap<>();
-        cursor.getCursorProperties().keySet().forEach(key -> {
+        cursorProperties.keySet().forEach(key -> {
             if (key.startsWith(DELAYED_BUCKET_KEY_PREFIX)) {
                 String[] keys = key.split(DELIMITER);
                 checkArgument(keys.length == 3);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
index 0a82b2b4c3c..54fec3934dd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
@@ -20,10 +20,13 @@ package org.apache.pulsar.broker.service.persistent;
 
 import static 
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import com.google.common.collect.Multimap;
 import java.io.ByteArrayOutputStream;
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -32,6 +35,7 @@ import lombok.Cleanup;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.pulsar.broker.BrokerTestUtil;
@@ -40,6 +44,7 @@ import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -47,6 +52,7 @@ import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker")
@@ -353,4 +359,121 @@ public class BucketDelayedDeliveryTest extends 
DelayedDeliveryTest {
             }
         }
     }
+
+    @DataProvider(name = "subscriptionTypes")
+    public Object[][] subscriptionTypes() {
+        return new Object[][]{
+                {SubscriptionType.Shared},
+                {SubscriptionType.Key_Shared},
+                {SubscriptionType.Failover},
+                {SubscriptionType.Exclusive},
+        };
+    }
+
+    /**
+     * see: https://github.com/apache/pulsar/pull/21595.
+     */
+    @Test(dataProvider = "subscriptionTypes")
+    public void testDeleteTopicIfCursorPropsEmpty(SubscriptionType 
subscriptionType) throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        final String subscriptionName = "s1";
+        // create a topic.
+        admin.topics().createNonPartitionedTopic(topic);
+        // create a subscription without props.
+        admin.topics().createSubscription(topic, subscriptionName, 
MessageId.earliest);
+        
pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName)
+                .subscriptionType(subscriptionType).subscribe().close();
+        ManagedCursorImpl cursor = findCursor(topic, subscriptionName);
+        assertNotNull(cursor);
+        assertTrue(cursor.getCursorProperties() == null || 
cursor.getCursorProperties().isEmpty());
+        // Test topic deletion is successful.
+        admin.topics().delete(topic);
+    }
+
+    /**
+     * see: https://github.com/apache/pulsar/pull/21595.
+     */
+    @Test(dataProvider = "subscriptionTypes")
+    public void testDeletePartitionedTopicIfCursorPropsEmpty(SubscriptionType 
subscriptionType) throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        final String subscriptionName = "s1";
+        // create a topic.
+        admin.topics().createPartitionedTopic(topic, 2);
+        // create a subscription without props.
+        admin.topics().createSubscription(topic, subscriptionName, 
MessageId.earliest);
+        
pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName)
+                .subscriptionType(subscriptionType).subscribe().close();
+        ManagedCursorImpl cursor = findCursor(topic + "-partition-0", 
subscriptionName);
+        assertNotNull(cursor);
+        assertTrue(cursor.getCursorProperties() == null || 
cursor.getCursorProperties().isEmpty());
+        // Test topic deletion is successful.
+        admin.topics().deletePartitionedTopic(topic);
+    }
+
+    /**
+     * see: https://github.com/apache/pulsar/pull/21595.
+     */
+    @Test(dataProvider = "subscriptionTypes")
+    public void testDeleteTopicIfCursorPropsNotEmpty(SubscriptionType 
subscriptionType) throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        final String subscriptionName = "s1";
+        // create a topic.
+        admin.topics().createNonPartitionedTopic(topic);
+        // create a subscription without props.
+        admin.topics().createSubscription(topic, subscriptionName, 
MessageId.earliest);
+        
pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName)
+                .subscriptionType(subscriptionType).subscribe().close();
+        ManagedCursorImpl cursor = findCursor(topic, subscriptionName);
+        assertNotNull(cursor);
+        assertTrue(cursor.getCursorProperties() == null || 
cursor.getCursorProperties().isEmpty());
+        // Put a subscription prop.
+        Map<String,String> properties = new HashMap<>();
+        properties.put("ignore", "ignore");
+        admin.topics().updateSubscriptionProperties(topic, subscriptionName, 
properties);
+        assertTrue(cursor.getCursorProperties() != null && 
!cursor.getCursorProperties().isEmpty());
+        // Test topic deletion is successful.
+        admin.topics().delete(topic);
+    }
+
+    /**
+     * see: https://github.com/apache/pulsar/pull/21595.
+     */
+    @Test(dataProvider = "subscriptionTypes")
+    public void 
testDeletePartitionedTopicIfCursorPropsNotEmpty(SubscriptionType 
subscriptionType) throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+        final String subscriptionName = "s1";
+        // create a topic.
+        admin.topics().createPartitionedTopic(topic, 2);
+        pulsarClient.newProducer().topic(topic).create().close();
+        // create a subscription without props.
+        admin.topics().createSubscription(topic, subscriptionName, 
MessageId.earliest);
+        
pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName)
+                .subscriptionType(subscriptionType).subscribe().close();
+
+        ManagedCursorImpl cursor = findCursor(topic + "-partition-0", 
subscriptionName);
+        assertNotNull(cursor);
+        assertTrue(cursor.getCursorProperties() == null || 
cursor.getCursorProperties().isEmpty());
+        // Put a subscription prop.
+        Map<String,String> properties = new HashMap<>();
+        properties.put("ignore", "ignore");
+        admin.topics().updateSubscriptionProperties(topic, subscriptionName, 
properties);
+        assertTrue(cursor.getCursorProperties() != null && 
!cursor.getCursorProperties().isEmpty());
+        // Test topic deletion is successful.
+        admin.topics().deletePartitionedTopic(topic);
+    }
+
+
+    private ManagedCursorImpl findCursor(String topic, String 
subscriptionName) {
+        PersistentTopic persistentTopic =
+                (PersistentTopic) pulsar.getBrokerService().getTopic(topic, 
false).join().get();
+        Iterator<ManagedCursor> cursorIterator = 
persistentTopic.getManagedLedger().getCursors().iterator();
+        while (cursorIterator.hasNext()) {
+            ManagedCursor managedCursor = cursorIterator.next();
+            if (managedCursor == null || 
!managedCursor.getName().equals(subscriptionName)) {
+                continue;
+            }
+            return (ManagedCursorImpl) managedCursor;
+        }
+        return null;
+    }
 }

Reply via email to