This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1fae42d723bcb0515c593ba1cedb0cc9d67578d3 Author: congbo <[email protected]> AuthorDate: Fri Mar 18 11:06:47 2022 +0800 [Transaction] Fix transaction buffer recover BrokerMetadataException close topic (#14709) ### Motivation When TopicTransactionBuffer recover fail throw BrokerMetadataException, we should close this topic, if we don't close the topic, we can't send message because TopicTransactionBuffer recover fail  ### Modifications When recover fail by BrokerMetadataException, close topic ### Verifying this change add test for it (cherry picked from commit c4e4ddd1dae2249938c8ce15e5282301b167cd5e) --- .../buffer/impl/TopicTransactionBuffer.java | 5 ++ .../TopicTransactionBufferRecoverTest.java | 64 +++++++++++++++++++++- .../pulsar/broker/transaction/TransactionTest.java | 3 +- 3 files changed, 70 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index e0a6695..3b28966d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -179,6 +179,11 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen @Override public void recoverExceptionally(Exception e) { + if (e instanceof PulsarClientException.BrokerMetadataException) { + log.warn("Closing topic {} due to read transaction buffer snapshot while recovering the " + + "transaction buffer throw exception", topic.getName(), e); + topic.close(); + } transactionBufferFuture.completeExceptionally(e); } }, this.topic, this)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java index 335cecc..5701d22 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java @@ -33,10 +33,14 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.commons.collections4.map.LinkedMap; import org.apache.commons.lang3.RandomUtils; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.AbstractTopic; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.TransactionBufferSnapshotService; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; +import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot; import org.apache.pulsar.client.api.Consumer; @@ -44,6 +48,7 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.Schema; @@ -60,7 +65,10 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; - +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -442,4 +450,58 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase { reader.close(); } + + @Test(timeOut=30000) + public void testTransactionBufferRecoverThrowBrokerMetadataException() throws Exception { + String topic = NAMESPACE1 + "/testTransactionBufferRecoverThrowBrokerMetadataException"; + @Cleanup + Producer<byte[]> producer = pulsarClient + .newProducer() + .topic(topic) + .sendTimeout(0, TimeUnit.SECONDS) + .create(); + + Transaction txn = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.SECONDS) + .build().get(); + + producer.newMessage(txn).value("test".getBytes()).sendAsync(); + producer.newMessage(txn).value("test".getBytes()).sendAsync(); + txn.commit().get(); + + // take snapshot + PersistentTopic originalTopic = (PersistentTopic) getPulsarServiceList().get(0) + .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get(); + TransactionBufferSnapshotService transactionBufferSnapshotService = + mock(TransactionBufferSnapshotService.class); + SystemTopicClient.Reader<TransactionBufferSnapshot> reader = mock(SystemTopicClient.Reader.class); + // mock reader can't read snapshot fail + doThrow(new PulsarClientException.BrokerMetadataException("")).when(reader).hasMoreEvents(); + doReturn(CompletableFuture.completedFuture(reader)).when(transactionBufferSnapshotService).createReader(any()); + + Field field = PulsarService.class.getDeclaredField("transactionBufferSnapshotService"); + field.setAccessible(true); + TransactionBufferSnapshotService transactionBufferSnapshotServiceOriginal = + (TransactionBufferSnapshotService) field.get(getPulsarServiceList().get(0)); + field.set(getPulsarServiceList().get(0), transactionBufferSnapshotService); + + // recover again will throw BrokerMetadataException then close topic + new TopicTransactionBuffer(originalTopic); + Awaitility.await().untilAsserted(() -> { + // isFenced means closed + Field close = AbstractTopic.class.getDeclaredField("isFenced"); + close.setAccessible(true); + assertTrue((boolean) close.get(originalTopic)); + }); + field.set(getPulsarServiceList().get(0), transactionBufferSnapshotServiceOriginal); + + // topic recover success + txn = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.SECONDS) + .build().get(); + + producer.newMessage(txn).value("test".getBytes()).sendAsync(); + txn.commit().get(); + } + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 28d4fcf..2b1a2f4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -41,6 +41,7 @@ import java.util.HashMap; import java.util.Map; import java.util.List; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -806,7 +807,7 @@ public class TransactionTest extends TransactionTestBase { public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception { PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0) .getBrokerService() - .getTopic(NAMESPACE1 + "/test", true) + .getTopic(NAMESPACE1 + "/changeMaxReadPositionAndAddAbortTimes" + UUID.randomUUID(), true) .get().get(); TransactionBuffer buffer = persistentTopic.getTransactionBuffer(); Field field = TopicTransactionBuffer.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes");
