This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new cbb17e701e7 Close TransactionBuffer when
MessageDeduplication#checkStatus failed (#19289)
cbb17e701e7 is described below
commit cbb17e701e76e21b03677d73ff49e2861371402b
Author: Tao Jiuming <[email protected]>
AuthorDate: Tue Jan 31 11:57:59 2023 +0800
Close TransactionBuffer when MessageDeduplication#checkStatus failed
(#19289)
---
.../pulsar/broker/service/BrokerService.java | 17 ++++++-
.../buffer/TopicTransactionBufferTest.java | 54 ++++++++++++++++++++++
2 files changed, 70 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b437f976837..4151887be3f 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1385,6 +1385,14 @@ public class BrokerService implements Closeable {
});
}
+
+ @VisibleForTesting
+ public void createPersistentTopic0(final String topic, boolean
createIfMissing,
+ CompletableFuture<Optional<Topic>>
topicFuture,
+ Map<String, String> properties) {
+ createPersistentTopic(topic, createIfMissing, topicFuture, properties);
+ }
+
private void createPersistentTopic(final String topic, boolean
createIfMissing,
CompletableFuture<Optional<Topic>>
topicFuture,
Map<String, String> properties) {
@@ -1459,6 +1467,12 @@ public class BrokerService implements Closeable {
.exceptionally((ex) -> {
log.warn("Replication or dedup
check failed."
+ " Removing topic from
topics list {}, {}", topic, ex);
+
persistentTopic.getTransactionBuffer()
+ .closeAsync()
+ .exceptionally(t -> {
+ log.error("[{}] Close
transactionBuffer failed", topic, t);
+ return null;
+ });
persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> {
topics.remove(topic,
topicFuture);
topicFuture.completeExceptionally(ex);
@@ -3187,7 +3201,8 @@ public class BrokerService implements Closeable {
}
@SuppressWarnings("unchecked")
- private <T extends Topic> T newTopic(String topic, ManagedLedger ledger,
BrokerService brokerService,
+ @VisibleForTesting
+ public <T extends Topic> T newTopic(String topic, ManagedLedger ledger,
BrokerService brokerService,
Class<T> topicClazz) throws PulsarServerException {
if (topicFactory != null) {
try {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
index 576ef647248..ee2649fb5c4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TopicTransactionBufferTest.java
@@ -18,9 +18,16 @@
*/
package org.apache.pulsar.broker.transaction.buffer;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
+import
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.naming.TopicName;
@@ -30,11 +37,17 @@ import
org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import
org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
import org.powermock.reflect.Whitebox;
+import org.mockito.Mockito;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.util.Collections;
import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
public class TopicTransactionBufferTest extends TransactionTestBase {
@@ -86,4 +99,45 @@ public class TopicTransactionBufferTest extends
TransactionTestBase {
Whitebox.setInternalState(persistentTopic.getManagedLedger(), "state",
ManagedLedgerImpl.State.WriteFailed);
txn.commit().get();
}
+
+ @Test
+ public void testCheckDeduplicationFailedWhenCreatePersistentTopic() throws
Exception {
+ String topic = "persistent://" + NAMESPACE1 + "/test_" +
UUID.randomUUID();
+ PulsarService pulsar = pulsarServiceList.get(0);
+ BrokerService brokerService0 = pulsar.getBrokerService();
+ BrokerService brokerService = Mockito.spy(brokerService0);
+ AtomicReference<PersistentTopic> reference = new AtomicReference<>();
+
+ Mockito
+ .doAnswer(inv -> {
+ String topic1 = inv.getArgument(0);
+ ManagedLedger ledger = inv.getArgument(1);
+ BrokerService service = inv.getArgument(2);
+ Class<?> topicKlass = inv.getArgument(3);
+ if (topicKlass.equals(PersistentTopic.class)) {
+ PersistentTopic pt = Mockito.spy(new
PersistentTopic(topic1, ledger, service));
+ CompletableFuture<Void> f =CompletableFuture
+ .failedFuture(new ManagedLedgerException("This
is an exception"));
+
Mockito.doReturn(f).when(pt).checkDeduplicationStatus();
+ reference.set(pt);
+ return pt;
+ } else {
+ return new NonPersistentTopic(topic1, service);
+ }
+ })
+ .when(brokerService)
+ .newTopic(Mockito.eq(topic), Mockito.any(),
Mockito.eq(brokerService),
+ Mockito.eq(PersistentTopic.class));
+
+ brokerService.createPersistentTopic0(topic, true, new
CompletableFuture<>(), Collections.emptyMap());
+
+ Awaitility.waitAtMost(1, TimeUnit.MINUTES).until(() -> reference.get()
!= null);
+ PersistentTopic persistentTopic = reference.get();
+ TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
+ Assert.assertTrue(buffer instanceof TopicTransactionBuffer);
+ TopicTransactionBuffer ttb = (TopicTransactionBuffer) buffer;
+ TopicTransactionBufferState.State expectState =
TopicTransactionBufferState.State.Close;
+ Assert.assertEquals(ttb.getState(), expectState);
+ }
+
}