This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 2cd8d47b60645339f643d7ba53741ef9cd324645 Author: ran <[email protected]> AuthorDate: Tue Sep 7 14:11:05 2021 +0800 [Transaction] add method to clear up transaction buffer snapshot (#11934) (cherry picked from commit d86db3f4ec4fb6bd04216a123cde2fee5c43f9d9) --- .../broker/service/persistent/PersistentTopic.java | 5 +- .../pulsar/broker/systopic/SystemTopicClient.java | 19 ++++++ .../TransactionBufferSystemTopicClient.java | 16 +++++ .../transaction/buffer/TransactionBuffer.java | 7 +++ .../buffer/impl/InMemTransactionBuffer.java | 5 ++ .../buffer/impl/TopicTransactionBuffer.java | 9 +++ .../buffer/impl/TransactionBufferDisable.java | 5 ++ .../TopicTransactionBufferRecoverTest.java | 72 +++++++++++++++++++++- 8 files changed, 136 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index a4db0bc..e165916 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util.function.BiFunction; import java.util.stream.Collectors; +import lombok.Getter; import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; @@ -218,6 +219,7 @@ public class PersistentTopic extends AbstractTopic // this future is for publish txn message in order. private volatile CompletableFuture<Void> transactionCompletableFuture; + @Getter protected final TransactionBuffer transactionBuffer; private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder(); @@ -1108,7 +1110,8 @@ public class PersistentTopic extends AbstractTopic CompletableFuture<SchemaVersion> deleteSchemaFuture = deleteSchema ? deleteSchema() : CompletableFuture.completedFuture(null); - deleteSchemaFuture.thenAccept(__ -> deleteTopicPolicies()).whenComplete((v, ex) -> { + deleteSchemaFuture.thenAccept(__ -> deleteTopicPolicies()) + .thenCompose(__ -> transactionBuffer.clearSnapshot()).whenComplete((v, ex) -> { if (ex != null) { log.error("[{}] Error deleting topic", topic, ex); unfenceTopicToResume(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java index 3f5a0a9..ceb1df6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/SystemTopicClient.java @@ -104,6 +104,25 @@ public interface SystemTopicClient<T> { CompletableFuture<MessageId> writeAsync(T t); /** + * Delete event in the system topic. + * @param t pulsar event + * @return message id + * @throws PulsarClientException exception while write event cause + */ + default MessageId delete(T t) throws PulsarClientException { + throw new UnsupportedOperationException("Unsupported operation"); + } + + /** + * Async delete event in the system topic. + * @param t pulsar event + * @return message id future + */ + default CompletableFuture<MessageId> deleteAsync(T t) { + throw new UnsupportedOperationException("Unsupported operation"); + } + + /** * Close the system topic writer. */ void close() throws IOException; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java index 81b7096..807bb9d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/TransactionBufferSystemTopicClient.java @@ -105,6 +105,22 @@ public class TransactionBufferSystemTopicClient extends SystemTopicClientBase<Tr } @Override + public MessageId delete(TransactionBufferSnapshot transactionBufferSnapshot) throws PulsarClientException { + return producer.newMessage() + .key(transactionBufferSnapshot.getTopicName()) + .value(null) + .send(); + } + + @Override + public CompletableFuture<MessageId> deleteAsync(TransactionBufferSnapshot transactionBufferSnapshot) { + return producer.newMessage() + .key(transactionBufferSnapshot.getTopicName()) + .value(null) + .sendAsync(); + } + + @Override public void close() throws IOException { this.producer.close(); transactionBufferSystemTopicClient.removeWriter(this); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java index c2f6006..6ffc218 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java @@ -125,6 +125,13 @@ public interface TransactionBuffer { CompletableFuture<Void> purgeTxns(List<Long> dataLedgers); /** + * Clear up the snapshot of the TransactionBuffer. + * + * @return Clear up operation result. + */ + CompletableFuture<Void> clearSnapshot(); + + /** * Close the buffer asynchronously. * * @return diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java index 43ed06f..213c7d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBuffer.java @@ -346,6 +346,11 @@ class InMemTransactionBuffer implements TransactionBuffer { } @Override + public CompletableFuture<Void> clearSnapshot() { + return CompletableFuture.completedFuture(null); + } + + @Override public CompletableFuture<Void> closeAsync() { buffers.values().forEach(TxnBuffer::close); return CompletableFuture.completedFuture(null); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java index c33c404..220b432 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java @@ -372,6 +372,15 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen } @Override + public CompletableFuture<Void> clearSnapshot() { + return this.takeSnapshotWriter.thenCompose(writer -> { + TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot(); + snapshot.setTopicName(topic.getName()); + return writer.deleteAsync(snapshot); + }).thenCompose(__ -> CompletableFuture.completedFuture(null)); + } + + @Override public CompletableFuture<Void> closeAsync() { changeToCloseState(); return this.takeSnapshotWriter.thenCompose(SystemTopicClient.Writer::closeAsync); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java index 4b50e55..ff18924 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java @@ -69,6 +69,11 @@ public class TransactionBufferDisable implements TransactionBuffer { } @Override + public CompletableFuture<Void> clearSnapshot() { + return CompletableFuture.completedFuture(null); + } + + @Override public CompletableFuture<Void> closeAsync() { return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java index d79d3c8..956b86e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.Sets; import java.io.IOException; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.NavigableMap; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -33,9 +34,11 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.commons.collections4.map.LinkedMap; +import org.apache.commons.lang3.RandomUtils; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory; import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer; import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot; import org.apache.pulsar.client.api.Consumer; @@ -50,11 +53,11 @@ import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.transaction.TransactionImpl; +import org.apache.pulsar.common.events.EventType; import org.apache.pulsar.common.events.EventsTopicNames; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; @@ -403,4 +406,71 @@ public class TopicTransactionBufferRecoverTest extends TransactionTestBase { } assertTrue(exist); } + + @Test + public void clearTransactionBufferSnapshotTest() throws Exception { + String topic = NAMESPACE1 + "/tb-snapshot-delete-" + RandomUtils.nextInt(); + + @Cleanup + Producer<byte[]> producer = pulsarClient + .newProducer() + .topic(topic) + .sendTimeout(0, TimeUnit.SECONDS) + .create(); + + Transaction txn = pulsarClient.newTransaction() + .withTransactionTimeout(5, TimeUnit.SECONDS) + .build().get(); + producer.newMessage(txn).value("test".getBytes()).sendAsync(); + producer.newMessage(txn).value("test".getBytes()).sendAsync(); + txn.commit().get(); + + // take snapshot + PersistentTopic originalTopic = (PersistentTopic) getPulsarServiceList().get(0) + .getBrokerService().getTopic(TopicName.get(topic).toString(), false).get().get(); + TopicTransactionBuffer topicTransactionBuffer = (TopicTransactionBuffer) originalTopic.getTransactionBuffer(); + Method takeSnapshotMethod = TopicTransactionBuffer.class.getDeclaredMethod("takeSnapshot"); + takeSnapshotMethod.setAccessible(true); + takeSnapshotMethod.invoke(topicTransactionBuffer); + + TopicName transactionBufferTopicName = + NamespaceEventsSystemTopicFactory.getSystemTopicName( + TopicName.get(topic).getNamespaceObject(), EventType.TRANSACTION_BUFFER_SNAPSHOT); + PersistentTopic snapshotTopic = (PersistentTopic) getPulsarServiceList().get(0) + .getBrokerService().getTopic(transactionBufferTopicName.toString(), false).get().get(); + Field field = PersistentTopic.class.getDeclaredField("currentCompaction"); + field.setAccessible(true); + + // Trigger compaction and make sure it is finished. + checkSnapshotCount(transactionBufferTopicName, true, snapshotTopic, field); + admin.topics().delete(topic, true); + checkSnapshotCount(transactionBufferTopicName, false, snapshotTopic, field); + } + + private void checkSnapshotCount(TopicName topicName, boolean hasSnapshot, + PersistentTopic persistentTopic, Field field) throws Exception { + persistentTopic.triggerCompaction(); + CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>) field.get(persistentTopic); + Awaitility.await().untilAsserted(() -> assertTrue(compactionFuture.isDone())); + + Reader<TransactionBufferSnapshot> reader = pulsarClient.newReader(Schema.AVRO(TransactionBufferSnapshot.class)) + .readCompacted(true) + .startMessageId(MessageId.earliest) + .startMessageIdInclusive() + .topic(topicName.toString()) + .create(); + + int count = 0; + while (true) { + Message<TransactionBufferSnapshot> snapshotMsg = reader.readNext(2, TimeUnit.SECONDS); + if (snapshotMsg != null) { + count++; + } else { + break; + } + } + assertTrue(hasSnapshot ? count > 0 : count == 0); + reader.close(); + } + }
