This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8eb7ee1e9e9 [fix] Close TransactionBuffer when create persistent topic 
timeout (#19384)
8eb7ee1e9e9 is described below

commit 8eb7ee1e9e96d4686e452320cdaba92d5eca7b4f
Author: Tao Jiuming <[email protected]>
AuthorDate: Mon Feb 6 15:32:59 2023 +0800

    [fix] Close TransactionBuffer when create persistent topic timeout (#19384)
---
 .../pulsar/broker/service/BrokerService.java       |  7 ++++
 .../buffer/TopicTransactionBufferTest.java         | 41 +++++++++++++++++++++-
 2 files changed, 47 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 27a1518cb81..db7a3f16f97 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1625,8 +1625,15 @@ public class BrokerService implements Closeable {
                                                                         - 
topicCreateTimeMs;
                                             
pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs);
                                             if 
(topicFuture.isCompletedExceptionally()) {
+                                                // Check create persistent 
topic timeout.
                                                 log.warn("{} future is already 
completed with failure {}, closing the"
                                                         + " topic", topic, 
FutureUtil.getException(topicFuture));
+                                                
persistentTopic.getTransactionBuffer()
+                                                        .closeAsync()
+                                                        .exceptionally(t -> {
+                                                            log.error("[{}] 
Close transactionBuffer failed", topic, t);
+                                                            return null;
+                                                        });
                                                 
persistentTopic.stopReplProducers()
                                                         .whenCompleteAsync((v, 
exception) -> {
                                                             
topics.remove(topic, topicFuture);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index 5a9c928ca3c..aa98fc7d701 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -24,6 +24,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
@@ -42,8 +43,11 @@ import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -116,7 +120,7 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
                     Class<?> topicKlass = inv.getArgument(3);
                     if (topicKlass.equals(PersistentTopic.class)) {
                         PersistentTopic pt = Mockito.spy(new 
PersistentTopic(topic1, ledger, service));
-                        CompletableFuture<Void> f =CompletableFuture
+                        CompletableFuture<Void> f = CompletableFuture
                                 .failedFuture(new ManagedLedgerException("This 
is an exception"));
                         
Mockito.doReturn(f).when(pt).checkDeduplicationStatus();
                         reference.set(pt);
@@ -140,4 +144,39 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
         Assert.assertEquals(ttb.getState(), expectState);
     }
 
+
+    @Test
+    public void testCloseTransactionBufferWhenTimeout() throws Exception {
+        String topic = "persistent://" + NAMESPACE1 + "/test_" + 
UUID.randomUUID();
+        PulsarService pulsar = pulsarServiceList.get(0);
+        BrokerService brokerService0 = pulsar.getBrokerService();
+        BrokerService brokerService = Mockito.spy(brokerService0);
+        AtomicReference<PersistentTopic> reference = new AtomicReference<>();
+        pulsar.getConfiguration().setTopicLoadTimeoutSeconds(10);
+        long topicLoadTimeout = 
TimeUnit.SECONDS.toMillis(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()
 + 1);
+
+        Mockito
+                .doAnswer(inv -> {
+                    Thread.sleep(topicLoadTimeout);
+                    PersistentTopic persistentTopic = (PersistentTopic) 
inv.callRealMethod();
+                    reference.set(persistentTopic);
+                    return persistentTopic;
+                })
+                .when(brokerService)
+                .newTopic(Mockito.eq(topic), Mockito.any(), 
Mockito.eq(brokerService),
+                        Mockito.eq(PersistentTopic.class));
+
+        CompletableFuture<Optional<Topic>> f = brokerService.getTopic(topic, 
true);
+
+        Awaitility.waitAtMost(20, TimeUnit.SECONDS)
+                .pollInterval(Duration.ofSeconds(2)).until(() -> 
reference.get() != null);
+        PersistentTopic persistentTopic = reference.get();
+        TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
+        Assert.assertTrue(buffer instanceof TopicTransactionBuffer);
+        TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer;
+        TopicTransactionBufferState.State expectState = 
TopicTransactionBufferState.State.Close;
+        Assert.assertEquals(ttb.getState(), expectState);
+        Assert.assertTrue(f.isCompletedExceptionally());
+    }
+
 }

Reply via email to