This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b2f2b53907e [fix] [broker] Delete topic timeout due to NPE (#21595)
b2f2b53907e is described below
commit b2f2b53907e43d0eb6757bfc4b77bf3db027f251
Author: fengyubiao <[email protected]>
AuthorDate: Tue Nov 21 17:21:51 2023 +0800
[fix] [broker] Delete topic timeout due to NPE (#21595)
### Issue:
There is an NPE that causes the Future of Delay message indexes bucket
deletion to be no longer complete, which leads to the topic deletion timeout.
You can reproduce this issue by the test
`testDeletePartitionedTopicIfCursorPropsEmpty` and
`testDeleteTopicIfCursorPropsEmpty`
### Modifications
Fix the NPE.
---
.../bookkeeper/mledger/impl/ManagedCursorImpl.java | 2 +-
.../BucketDelayedDeliveryTrackerFactory.java | 4 +
.../bucket/BucketDelayedDeliveryTracker.java | 7 +-
.../persistent/BucketDelayedDeliveryTest.java | 123 +++++++++++++++++++++
4 files changed, 134 insertions(+), 2 deletions(-)
diff --git
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 39f56a1ad60..4b65d62f0ee 100644
---
a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++
b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -690,7 +690,7 @@ public class ManagedCursorImpl implements ManagedCursor {
position = ledger.getLastPosition();
}
log.info("[{}] Cursor {} recovered to position {}", ledger.getName(),
name, position);
- this.cursorProperties = cursorProperties;
+ this.cursorProperties = cursorProperties == null ?
Collections.emptyMap() : cursorProperties;
messagesConsumedCounter =
-getNumberOfEntries(Range.openClosed(position, ledger.getLastPosition()));
markDeletePosition = position;
persistentMarkDeletePosition = position;
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
index 157fda8acc6..17d9795dd90 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedCursor;
+import org.apache.commons.collections4.MapUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.delayed.bucket.BookkeeperBucketSnapshotStorage;
@@ -85,6 +86,9 @@ public class BucketDelayedDeliveryTrackerFactory implements
DelayedDeliveryTrack
*/
public CompletableFuture<Void> cleanResidualSnapshots(ManagedCursor
cursor) {
Map<String, String> cursorProperties = cursor.getCursorProperties();
+ if (MapUtils.isEmpty(cursorProperties)) {
+ return CompletableFuture.completedFuture(null);
+ }
List<CompletableFuture<Void>> futures = new ArrayList<>();
FutureUtil.Sequencer<Void> sequencer = FutureUtil.Sequencer.create();
cursorProperties.forEach((k, v) -> {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
index 67a7de1f013..d7a3e80f086 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@@ -50,6 +50,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
@@ -137,9 +138,13 @@ public class BucketDelayedDeliveryTracker extends
AbstractDelayedDeliveryTracker
private synchronized long recoverBucketSnapshot() throws RuntimeException {
ManagedCursor cursor = this.lastMutableBucket.getCursor();
+ Map<String, String> cursorProperties = cursor.getCursorProperties();
+ if (MapUtils.isEmpty(cursorProperties)) {
+ return 0;
+ }
FutureUtil.Sequencer<Void> sequencer =
this.lastMutableBucket.getSequencer();
Map<Range<Long>, ImmutableBucket> toBeDeletedBucketMap = new
HashMap<>();
- cursor.getCursorProperties().keySet().forEach(key -> {
+ cursorProperties.keySet().forEach(key -> {
if (key.startsWith(DELAYED_BUCKET_KEY_PREFIX)) {
String[] keys = key.split(DELIMITER);
checkArgument(keys.length == 3);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
index 0a82b2b4c3c..54fec3934dd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/BucketDelayedDeliveryTest.java
@@ -20,10 +20,13 @@ package org.apache.pulsar.broker.service.persistent;
import static
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Multimap;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -32,6 +35,7 @@ import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.BrokerTestUtil;
@@ -40,6 +44,7 @@ import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
@@ -47,6 +52,7 @@ import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker")
@@ -353,4 +359,121 @@ public class BucketDelayedDeliveryTest extends
DelayedDeliveryTest {
}
}
}
+
+ @DataProvider(name = "subscriptionTypes")
+ public Object[][] subscriptionTypes() {
+ return new Object[][]{
+ {SubscriptionType.Shared},
+ {SubscriptionType.Key_Shared},
+ {SubscriptionType.Failover},
+ {SubscriptionType.Exclusive},
+ };
+ }
+
+ /**
+ * see: https://github.com/apache/pulsar/pull/21595.
+ */
+ @Test(dataProvider = "subscriptionTypes")
+ public void testDeleteTopicIfCursorPropsEmpty(SubscriptionType
subscriptionType) throws Exception {
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+ final String subscriptionName = "s1";
+ // create a topic.
+ admin.topics().createNonPartitionedTopic(topic);
+ // create a subscription without props.
+ admin.topics().createSubscription(topic, subscriptionName,
MessageId.earliest);
+
pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName)
+ .subscriptionType(subscriptionType).subscribe().close();
+ ManagedCursorImpl cursor = findCursor(topic, subscriptionName);
+ assertNotNull(cursor);
+ assertTrue(cursor.getCursorProperties() == null ||
cursor.getCursorProperties().isEmpty());
+ // Test topic deletion is successful.
+ admin.topics().delete(topic);
+ }
+
+ /**
+ * see: https://github.com/apache/pulsar/pull/21595.
+ */
+ @Test(dataProvider = "subscriptionTypes")
+ public void testDeletePartitionedTopicIfCursorPropsEmpty(SubscriptionType
subscriptionType) throws Exception {
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+ final String subscriptionName = "s1";
+ // create a topic.
+ admin.topics().createPartitionedTopic(topic, 2);
+ // create a subscription without props.
+ admin.topics().createSubscription(topic, subscriptionName,
MessageId.earliest);
+
pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName)
+ .subscriptionType(subscriptionType).subscribe().close();
+ ManagedCursorImpl cursor = findCursor(topic + "-partition-0",
subscriptionName);
+ assertNotNull(cursor);
+ assertTrue(cursor.getCursorProperties() == null ||
cursor.getCursorProperties().isEmpty());
+ // Test topic deletion is successful.
+ admin.topics().deletePartitionedTopic(topic);
+ }
+
+ /**
+ * see: https://github.com/apache/pulsar/pull/21595.
+ */
+ @Test(dataProvider = "subscriptionTypes")
+ public void testDeleteTopicIfCursorPropsNotEmpty(SubscriptionType
subscriptionType) throws Exception {
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+ final String subscriptionName = "s1";
+ // create a topic.
+ admin.topics().createNonPartitionedTopic(topic);
+ // create a subscription without props.
+ admin.topics().createSubscription(topic, subscriptionName,
MessageId.earliest);
+
pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName)
+ .subscriptionType(subscriptionType).subscribe().close();
+ ManagedCursorImpl cursor = findCursor(topic, subscriptionName);
+ assertNotNull(cursor);
+ assertTrue(cursor.getCursorProperties() == null ||
cursor.getCursorProperties().isEmpty());
+ // Put a subscription prop.
+ Map<String,String> properties = new HashMap<>();
+ properties.put("ignore", "ignore");
+ admin.topics().updateSubscriptionProperties(topic, subscriptionName,
properties);
+ assertTrue(cursor.getCursorProperties() != null &&
!cursor.getCursorProperties().isEmpty());
+ // Test topic deletion is successful.
+ admin.topics().delete(topic);
+ }
+
+ /**
+ * see: https://github.com/apache/pulsar/pull/21595.
+ */
+ @Test(dataProvider = "subscriptionTypes")
+ public void
testDeletePartitionedTopicIfCursorPropsNotEmpty(SubscriptionType
subscriptionType) throws Exception {
+ final String topic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp_");
+ final String subscriptionName = "s1";
+ // create a topic.
+ admin.topics().createPartitionedTopic(topic, 2);
+ pulsarClient.newProducer().topic(topic).create().close();
+ // create a subscription without props.
+ admin.topics().createSubscription(topic, subscriptionName,
MessageId.earliest);
+
pulsarClient.newConsumer().topic(topic).subscriptionName(subscriptionName)
+ .subscriptionType(subscriptionType).subscribe().close();
+
+ ManagedCursorImpl cursor = findCursor(topic + "-partition-0",
subscriptionName);
+ assertNotNull(cursor);
+ assertTrue(cursor.getCursorProperties() == null ||
cursor.getCursorProperties().isEmpty());
+ // Put a subscription prop.
+ Map<String,String> properties = new HashMap<>();
+ properties.put("ignore", "ignore");
+ admin.topics().updateSubscriptionProperties(topic, subscriptionName,
properties);
+ assertTrue(cursor.getCursorProperties() != null &&
!cursor.getCursorProperties().isEmpty());
+ // Test topic deletion is successful.
+ admin.topics().deletePartitionedTopic(topic);
+ }
+
+
+ private ManagedCursorImpl findCursor(String topic, String
subscriptionName) {
+ PersistentTopic persistentTopic =
+ (PersistentTopic) pulsar.getBrokerService().getTopic(topic,
false).join().get();
+ Iterator<ManagedCursor> cursorIterator =
persistentTopic.getManagedLedger().getCursors().iterator();
+ while (cursorIterator.hasNext()) {
+ ManagedCursor managedCursor = cursorIterator.next();
+ if (managedCursor == null ||
!managedCursor.getName().equals(subscriptionName)) {
+ continue;
+ }
+ return (ManagedCursorImpl) managedCursor;
+ }
+ return null;
+ }
}