This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new cbb17e701e7 Close TransactionBuffer when 
MessageDeduplication#checkStatus failed (#19289)
cbb17e701e7 is described below

commit cbb17e701e76e21b03677d73ff49e2861371402b
Author: Tao Jiuming <[email protected]>
AuthorDate: Tue Jan 31 11:57:59 2023 +0800

    Close TransactionBuffer when MessageDeduplication#checkStatus failed 
(#19289)
---
 .../pulsar/broker/service/BrokerService.java       | 17 ++++++-
 .../buffer/TopicTransactionBufferTest.java         | 54 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b437f976837..4151887be3f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1385,6 +1385,14 @@ public class BrokerService implements Closeable {
                 });
     }
 
+
+    @VisibleForTesting
+    public void createPersistentTopic0(final String topic, boolean 
createIfMissing,
+                                       CompletableFuture<Optional<Topic>> 
topicFuture,
+                                       Map<String, String> properties) {
+        createPersistentTopic(topic, createIfMissing, topicFuture, properties);
+    }
+
     private void createPersistentTopic(final String topic, boolean 
createIfMissing,
                                        CompletableFuture<Optional<Topic>> 
topicFuture,
                                        Map<String, String> properties) {
@@ -1459,6 +1467,12 @@ public class BrokerService implements Closeable {
                                         .exceptionally((ex) -> {
                                             log.warn("Replication or dedup 
check failed."
                                                     + " Removing topic from 
topics list {}, {}", topic, ex);
+                                            
persistentTopic.getTransactionBuffer()
+                                                    .closeAsync()
+                                                    .exceptionally(t -> {
+                                                        log.error("[{}] Close 
transactionBuffer failed", topic, t);
+                                                        return null;
+                                                    });
                                             
persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> {
                                                 topics.remove(topic, 
topicFuture);
                                                 
topicFuture.completeExceptionally(ex);
@@ -3187,7 +3201,8 @@ public class BrokerService implements Closeable {
     }
 
     @SuppressWarnings("unchecked")
-    private <T extends Topic> T newTopic(String topic, ManagedLedger ledger, 
BrokerService brokerService,
+    @VisibleForTesting
+    public <T extends Topic> T newTopic(String topic, ManagedLedger ledger, 
BrokerService brokerService,
             Class<T> topicClazz) throws PulsarServerException {
         if (topicFactory != null) {
             try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index 576ef647248..ee2649fb5c4 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -18,9 +18,16 @@
  */
 package org.apache.pulsar.broker.transaction.buffer;
 
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
+import 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.common.naming.TopicName;
@@ -30,11 +37,17 @@ import 
org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
 import 
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
 import org.awaitility.Awaitility;
 import org.powermock.reflect.Whitebox;
+import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+import java.util.Collections;
 import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class TopicTransactionBufferTest extends TransactionTestBase {
 
@@ -86,4 +99,45 @@ public class TopicTransactionBufferTest extends 
TransactionTestBase {
         Whitebox.setInternalState(persistentTopic.getManagedLedger(), "state", 
ManagedLedgerImpl.State.WriteFailed);
         txn.commit().get();
     }
+
+    @Test
+    public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws 
Exception {
+        String topic = "persistent://" + NAMESPACE1 + "/test_" + 
UUID.randomUUID();
+        PulsarService pulsar = pulsarServiceList.get(0);
+        BrokerService brokerService0 = pulsar.getBrokerService();
+        BrokerService brokerService = Mockito.spy(brokerService0);
+        AtomicReference<PersistentTopic> reference = new AtomicReference<>();
+
+        Mockito
+                .doAnswer(inv -> {
+                    String topic1 = inv.getArgument(0);
+                    ManagedLedger ledger = inv.getArgument(1);
+                    BrokerService service = inv.getArgument(2);
+                    Class<?> topicKlass = inv.getArgument(3);
+                    if (topicKlass.equals(PersistentTopic.class)) {
+                        PersistentTopic pt = Mockito.spy(new 
PersistentTopic(topic1, ledger, service));
+                        CompletableFuture<Void> f =CompletableFuture
+                                .failedFuture(new ManagedLedgerException("This 
is an exception"));
+                        
Mockito.doReturn(f).when(pt).checkDeduplicationStatus();
+                        reference.set(pt);
+                        return pt;
+                    } else {
+                        return new NonPersistentTopic(topic1, service);
+                    }
+                })
+                .when(brokerService)
+                .newTopic(Mockito.eq(topic), Mockito.any(), 
Mockito.eq(brokerService),
+                        Mockito.eq(PersistentTopic.class));
+
+        brokerService.createPersistentTopic0(topic, true, new 
CompletableFuture<>(), Collections.emptyMap());
+
+        Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get() 
!= null);
+        PersistentTopic persistentTopic = reference.get();
+        TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
+        Assert.assertTrue(buffer instanceof TopicTransactionBuffer);
+        TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer;
+        TopicTransactionBufferState.State expectState = 
TopicTransactionBufferState.State.Close;
+        Assert.assertEquals(ttb.getState(), expectState);
+    }
+
 }

Reply via email to