This is an automated email from the ASF dual-hosted git repository.
yubiao pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new e220279f4f1 [fix][broker][branch-2.11] The topic might reference a
closed ledger (#22860) (#23054)
e220279f4f1 is described below
commit e220279f4f1d02c9658ddbdf95a737d5baef9a60
Author: fengyubiao <[email protected]>
AuthorDate: Mon Jul 22 17:33:40 2024 +0800
[fix][broker][branch-2.11] The topic might reference a closed ledger
(#22860) (#23054)
---
.../org/apache/pulsar/broker/PulsarService.java | 5 ++
.../pulsar/broker/service/BrokerService.java | 13 ++++
.../client/api/OrphanPersistentTopicTest.java | 71 ++++++++++++++++++++++
3 files changed, 89 insertions(+)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 15704d49ad3..1aac0ba5e7c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1836,4 +1836,9 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
protected BrokerService newBrokerService(PulsarService pulsar) throws
Exception {
return new BrokerService(pulsar, ioEventLoopGroup);
}
+
+ @VisibleForTesting
+ public void setTransactionExecutorProvider(TransactionBufferProvider
transactionBufferProvider) {
+ this.transactionBufferProvider = transactionBufferProvider;
+ }
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 090d5ce0b54..c0da47755b6 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -988,6 +988,17 @@ public class BrokerService implements Closeable {
try {
CompletableFuture<Optional<Topic>> topicFuture =
topics.get(topicName.toString());
if (topicFuture != null) {
+ if (topicFuture.isCompletedExceptionally()) {
+ try {
+ topicFuture.join();
+ } catch (Exception ex) {
+ Throwable actEx =
FutureUtil.unwrapCompletionException(ex);
+ if (actEx == FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION) {
+ return CompletableFuture.failedFuture(new
TimeoutException("The previous loading task"
+ + " has not finished yet even through it
has timeout, please retry again."));
+ }
+ }
+ }
if (topicFuture.isCompletedExceptionally()
|| (topicFuture.isDone() &&
!topicFuture.getNow(Optional.empty()).isPresent())) {
// Exceptional topics should be recreated.
@@ -1608,6 +1619,7 @@ public class BrokerService implements Closeable {
+ " topic", topic,
FutureUtil.getException(topicFuture));
executor().submit(() -> {
persistentTopic.close().whenComplete((ignore, ex) -> {
+ topics.remove(topic,
topicFuture);
if (ex != null) {
log.warn("[{}] Get
an error when closing topic.",
topic, ex);
@@ -1645,6 +1657,7 @@ public class BrokerService implements Closeable {
if (!createIfMissing && exception instanceof
ManagedLedgerNotFoundException) {
// We were just trying to load a topic and the
topic doesn't exist
topicFuture.complete(Optional.empty());
+ pulsar.getExecutor().execute(() ->
topics.remove(topic, topicFuture));
} else {
log.warn("Failed to create topic {}", topic,
exception);
pulsar.getExecutor().execute(() ->
topics.remove(topic, topicFuture));
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
index 4f5ab783374..76e4be48b2f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/OrphanPersistentTopicTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.api;
+import static
org.apache.pulsar.broker.service.persistent.PersistentTopic.DEDUPLICATION_CURSOR_NAME;
+import static org.junit.Assert.assertNotNull;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.List;
@@ -25,13 +27,18 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPoliciesService;
import org.apache.pulsar.broker.service.TopicPolicyListener;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
+import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
+import
org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferDisable;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.awaitility.Awaitility;
@@ -114,4 +121,68 @@ public class OrphanPersistentTopicTest extends
ProducerConsumerBase {
admin.topics().delete(tpName, false);
pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds);
}
+
+ @Test
+ public void testCloseLedgerThatTopicAfterCreateTimeout() throws Exception {
+ // Make the topic loading timeout faster.
+ long originalTopicLoadTimeoutSeconds =
pulsar.getConfig().getTopicLoadTimeoutSeconds();
+ int topicLoadTimeoutSeconds = 1;
+ pulsar.getConfig().setTopicLoadTimeoutSeconds(topicLoadTimeoutSeconds);
+ pulsar.getConfig().setBrokerDeduplicationEnabled(true);
+ pulsar.getConfig().setTransactionCoordinatorEnabled(true);
+ String tpName =
BrokerTestUtil.newUniqueName("persistent://public/default/tp2");
+
+ // Mock message deduplication recovery speed topicLoadTimeoutSeconds
+ AtomicBoolean stopDelay = new AtomicBoolean();
+ String mlPath = BrokerService.MANAGED_LEDGER_PATH_ZNODE + "/" +
+ TopicName.get(tpName).getPersistenceNamingEncoding() + "/" +
DEDUPLICATION_CURSOR_NAME;
+ mockZooKeeper.delay(topicLoadTimeoutSeconds * 1000, (op, path) -> {
+ if (mlPath.equals(path) && !stopDelay.get()) {
+ log.info("Topic load timeout: " + path);
+ return true;
+ }
+ return false;
+ });
+
+ // First load topic will trigger timeout
+ // The first topic load will trigger a timeout. When the topic closes,
it will call transactionBuffer.close.
+ // Here, we simulate a sleep to ensure that the ledger is not
immediately closed.
+ TransactionBufferProvider mockTransactionBufferProvider = new
TransactionBufferProvider() {
+ @Override
+ public TransactionBuffer newTransactionBuffer(Topic originTopic) {
+ return new TransactionBufferDisable(originTopic) {
+ @SneakyThrows
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ Thread.sleep(500);
+ return super.closeAsync();
+ }
+ };
+ }
+ };
+ TransactionBufferProvider originalTransactionBufferProvider =
pulsar.getTransactionBufferProvider();
+ pulsar.setTransactionExecutorProvider(mockTransactionBufferProvider);
+ CompletableFuture<Optional<Topic>> firstLoad =
pulsar.getBrokerService().getTopic(tpName, true);
+ Awaitility.await().ignoreExceptions().atMost(5, TimeUnit.SECONDS)
+ .pollInterval(100, TimeUnit.MILLISECONDS)
+ // assert first create topic timeout
+ .untilAsserted(() -> {
+ assertTrue(firstLoad.isCompletedExceptionally());
+ });
+
+ // Once the first load topic times out, immediately to load the topic
again.
+ stopDelay.set(true);
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(tpName).create();
+ for (int i = 0; i < 10; i++) {
+ MessageId send = producer.send("msg".getBytes());
+ Thread.sleep(100);
+ assertNotNull(send);
+ }
+
+ // set to back
+
pulsar.setTransactionExecutorProvider(originalTransactionBufferProvider);
+
pulsar.getConfig().setTopicLoadTimeoutSeconds(originalTopicLoadTimeoutSeconds);
+ pulsar.getConfig().setBrokerDeduplicationEnabled(false);
+ pulsar.getConfig().setTransactionCoordinatorEnabled(false);
+ }
}