This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8eb7ee1e9e9 [fix] Close TransactionBuffer when create persistent topic
timeout (#19384)
8eb7ee1e9e9 is described below
commit 8eb7ee1e9e96d4686e452320cdaba92d5eca7b4f
Author: Tao Jiuming <[email protected]>
AuthorDate: Mon Feb 6 15:32:59 2023 +0800
[fix] Close TransactionBuffer when create persistent topic timeout (#19384)
---
.../pulsar/broker/service/BrokerService.java | 7 ++++
.../buffer/TopicTransactionBufferTest.java | 41 +++++++++++++++++++++-
2 files changed, 47 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 27a1518cb81..db7a3f16f97 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1625,8 +1625,15 @@ public class BrokerService implements Closeable {
-
topicCreateTimeMs;
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
if
(topicFuture.isCompletedExceptionally()) {
+ // Check create persistent
topic timeout.
log.warn("{} future is already
completed with failure {}, closing the"
+ " topic", topic,
FutureUtil.getException(topicFuture));
+
persistentTopic.getTransactionBuffer()
+ .closeAsync()
+ .exceptionally(t -> {
+ log.error("[{}]
Close transactionBuffer failed", topic, t);
+ return null;
+ });
persistentTopic.stopReplProducers()
.whenCompleteAsync((v,
exception) -> {
topics.remove(topic, topicFuture);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index 5a9c928ca3c..aa98fc7d701 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -24,6 +24,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
@@ -42,8 +43,11 @@ import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+
+import java.time.Duration;
import java.util.Collections;
import java.util.Map;
+import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -116,7 +120,7 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
Class<?> topicKlass = inv.getArgument(3);
if (topicKlass.equals(PersistentTopic.class)) {
PersistentTopic pt = Mockito.spy(new
PersistentTopic(topic1, ledger, service));
- CompletableFuture<Void> f =CompletableFuture
+ CompletableFuture<Void> f = CompletableFuture
.failedFuture(new ManagedLedgerException("This
is an exception"));
Mockito.doReturn(f).when(pt).checkDeduplicationStatus();
reference.set(pt);
@@ -140,4 +144,39 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
Assert.assertEquals(ttb.getState(), expectState);
}
+
+ @Test
+ public void testCloseTransactionBufferWhenTimeout() throws Exception {
+ String topic = "persistent://" + NAMESPACE1 + "/test_" +
UUID.randomUUID();
+ PulsarService pulsar = pulsarServiceList.get(0);
+ BrokerService brokerService0 = pulsar.getBrokerService();
+ BrokerService brokerService = Mockito.spy(brokerService0);
+ AtomicReference<PersistentTopic> reference = new AtomicReference<>();
+ pulsar.getConfiguration().setTopicLoadTimeoutSeconds(10);
+ long topicLoadTimeout =
TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()
+ 1);
+
+ Mockito
+ .doAnswer(inv -> {
+ Thread.sleep(topicLoadTimeout);
+ PersistentTopic persistentTopic = (PersistentTopic)
inv.callRealMethod();
+ reference.set(persistentTopic);
+ return persistentTopic;
+ })
+ .when(brokerService)
+ .newTopic(Mockito.eq(topic), Mockito.any(),
Mockito.eq(brokerService),
+ Mockito.eq(PersistentTopic.class));
+
+ CompletableFuture<Optional<Topic>> f = brokerService.getTopic(topic,
true);
+
+ Awaitility.waitAtMost(20, TimeUnit.SECONDS)
+ .pollInterval(Duration.ofSeconds(2)).until(() ->
reference.get() != null);
+ PersistentTopic persistentTopic = reference.get();
+ TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
+ Assert.assertTrue(buffer instanceof TopicTransactionBuffer);
+ TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer;
+ TopicTransactionBufferState.State expectState =
TopicTransactionBufferState.State.Close;
+ Assert.assertEquals(ttb.getState(), expectState);
+ Assert.assertTrue(f.isCompletedExceptionally());
+ }
+
}