This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9edaa8569de [fix] [broker] Topic can never be loaded up due to broker
maintains a failed topic creation future (#23184)
9edaa8569de is described below
commit 9edaa8569deff9c0cbb41b261fee472603f3df4d
Author: fengyubiao <[email protected]>
AuthorDate: Sun Aug 18 19:02:23 2024 +0800
[fix] [broker] Topic can never be loaded up due to broker maintains a
failed topic creation future (#23184)
---
.../pulsar/broker/service/BrokerService.java | 15 +++-
.../client/api/OrphanPersistentTopicTest.java | 95 ++++++++++++++++++++++
2 files changed, 106 insertions(+), 4 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index d13d3b3174a..338d5f420ca 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1664,6 +1664,7 @@ public class BrokerService implements Closeable {
topicFuture.completeExceptionally(new
ServiceUnitNotReadyException(msg));
}
}).exceptionally(ex -> {
+ pulsar.getExecutor().execute(() -> topics.remove(topic,
topicFuture));
topicFuture.completeExceptionally(ex);
return null;
});
@@ -1767,10 +1768,16 @@ public class BrokerService implements Closeable {
long topicLoadLatencyMs =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime())
-
topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
- if
(topicFuture.isCompletedExceptionally()) {
+ if
(!topicFuture.complete(Optional.of(persistentTopic))) {
// Check create persistent
topic timeout.
- log.warn("{} future is already
completed with failure {}, closing the"
- + " topic", topic,
FutureUtil.getException(topicFuture));
+ if
(topicFuture.isCompletedExceptionally()) {
+ log.warn("{} future is
already completed with failure {}, closing"
+ + " the topic", topic,
FutureUtil.getException(topicFuture));
+ } else {
+ // It should not happen.
+ log.error("{} future is
already completed by another thread, "
+ + "which is not
expected. Closing the current one", topic);
+ }
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, ex) -> {
topics.remove(topic,
topicFuture);
@@ -1782,7 +1789,6 @@ public class BrokerService implements Closeable {
});
} else {
addTopicToStatsMaps(topicName,
persistentTopic);
-
topicFuture.complete(Optional.of(persistentTopic));
}
})
.exceptionally((ex) -> {
@@ -1811,6 +1817,7 @@ public class BrokerService implements Closeable {
public void openLedgerFailed(ManagedLedgerException
exception, Object ctx) {
if (!createIfMissing && exception instanceof
ManagedLedgerNotFoundException) {
// We were just trying to load a topic and the
topic doesn't exist
+ pulsar.getExecutor().execute(() ->
topics.remove(topic, topicFuture));
loadFuture.completeExceptionally(exception);
topicFuture.complete(Optional.empty());
} else {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
index b5af3cc6afd..f60aeb78387 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
@@ -19,13 +19,17 @@
package org.apache.pulsar.client.api;
import static
org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -33,6 +37,7 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
@@ -47,6 +52,7 @@ import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@@ -219,4 +225,93 @@ public class OrphanPersistentTopicTest extends
ProducerConsumerBase {
consumer.close();
admin.topics().delete(tpName, false);
}
+
+ @DataProvider(name = "whetherTimeoutOrNot")
+ public Object[][] whetherTimeoutOrNot() {
+ return new Object[][] {
+ {true},
+ {false}
+ };
+ }
+
+ @Test(timeOut = 60 * 1000, dataProvider = "whetherTimeoutOrNot")
+ public void testCheckOwnerShipFails(boolean injectTimeout) throws
Exception {
+ if (injectTimeout) {
+ pulsar.getConfig().setTopicLoadTimeoutSeconds(5);
+ }
+ String ns = "public" + "/" +
UUID.randomUUID().toString().replaceAll("-", "");
+ String tpName = BrokerTestUtil.newUniqueName("persistent://" + ns +
"/tp");
+ admin.namespaces().createNamespace(ns);
+ admin.topics().createNonPartitionedTopic(tpName);
+ admin.namespaces().unload(ns);
+
+ // Inject an error when calling
"NamespaceService.isServiceUnitActiveAsync".
+ AtomicInteger failedTimes = new AtomicInteger();
+ NamespaceService namespaceService = pulsar.getNamespaceService();
+ doAnswer(invocation -> {
+ TopicName paramTp = (TopicName) invocation.getArguments()[0];
+ if (paramTp.toString().equalsIgnoreCase(tpName) &&
failedTimes.incrementAndGet() <= 2) {
+ if (injectTimeout) {
+ Thread.sleep(10 * 1000);
+ }
+ log.info("Failed {} times", failedTimes.get());
+ return CompletableFuture.failedFuture(new
RuntimeException("mocked error"));
+ }
+ return invocation.callRealMethod();
+
}).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class));
+
+ // Verify: the consumer can create successfully eventually.
+ Consumer consumer =
pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe();
+
+ // cleanup.
+ if (injectTimeout) {
+ pulsar.getConfig().setTopicLoadTimeoutSeconds(60);
+ }
+ consumer.close();
+ admin.topics().delete(tpName);
+ }
+
+ @Test(timeOut = 60 * 1000, dataProvider = "whetherTimeoutOrNot")
+ public void testTopicLoadAndDeleteAtTheSameTime(boolean injectTimeout)
throws Exception {
+ if (injectTimeout) {
+ pulsar.getConfig().setTopicLoadTimeoutSeconds(5);
+ }
+ String ns = "public" + "/" +
UUID.randomUUID().toString().replaceAll("-", "");
+ String tpName = BrokerTestUtil.newUniqueName("persistent://" + ns +
"/tp");
+ admin.namespaces().createNamespace(ns);
+ admin.topics().createNonPartitionedTopic(tpName);
+ admin.namespaces().unload(ns);
+
+ // Inject a race condition: load topic and delete topic execute at the
same time.
+ AtomicInteger mockRaceConditionCounter = new AtomicInteger();
+ NamespaceService namespaceService = pulsar.getNamespaceService();
+ doAnswer(invocation -> {
+ TopicName paramTp = (TopicName) invocation.getArguments()[0];
+ if (paramTp.toString().equalsIgnoreCase(tpName) &&
mockRaceConditionCounter.incrementAndGet() <= 1) {
+ if (injectTimeout) {
+ Thread.sleep(10 * 1000);
+ }
+ log.info("Race condition occurs {} times",
mockRaceConditionCounter.get());
+
pulsar.getManagedLedgerFactory().delete(TopicName.get(tpName).getPersistenceNamingEncoding());
+ }
+ return invocation.callRealMethod();
+
}).when(namespaceService).isServiceUnitActiveAsync(any(TopicName.class));
+
+ // Verify: the consumer create failed due to pulsar does not allow to
create topic automatically.
+ try {
+ pulsar.getBrokerService().getTopic(tpName, false,
Collections.emptyMap()).join();
+ } catch (Exception ex) {
+ log.warn("Expected error", ex);
+ }
+
+ // Verify: the consumer create successfully after allowing to create
topic automatically.
+ Consumer consumer =
pulsarClient.newConsumer().topic(tpName).subscriptionName("s1").subscribe();
+
+ // cleanup.
+ if (injectTimeout) {
+ pulsar.getConfig().setTopicLoadTimeoutSeconds(60);
+ }
+ consumer.close();
+ admin.topics().delete(tpName);
+ }
}