This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 127ea2576d6 [fix][txn] Implement compatibility for transaction buffer
segmented snapshot feature upgrade (#20235)
127ea2576d6 is described below
commit 127ea2576d65d72b7906921178565b1a3a9e722e
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue May 16 09:41:35 2023 +0800
[fix][txn] Implement compatibility for transaction buffer segmented
snapshot feature upgrade (#20235)
master https://github.com/apache/pulsar/issues/16913
## Motivation:
The transaction buffer segmented snapshot feature aims to improve the
transaction buffer's performance by segmenting the snapshot and managing it
more efficiently. However, for existing topics that were created before this
feature was introduced, we need to ensure a seamless transition and
compatibility when enabling the segmented snapshot feature.
## Modifications:
1. Updated the `recoverFromSnapshot()` method to read from another topic if
the `persistentSnapshotIndexes` is null. This ensures that the appropriate
snapshot data is fetched during the recovery process when upgrading to the
segmented snapshot feature.
2. Created a new test `testSnapshotProcessorUpdate()` that verifies the
compatibility of the transaction
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
*(Please pick either of the following options)*
This change is a trivial rework / code cleanup without any test coverage.
*(or)*
This change is already covered by existing tests, such as *(please describe
tests)*.
*(or)*
This change added tests and can be verified as follows:
*(example:)*
- *Added integration tests for end-to-end deployment with large payloads
(10MB)*
- *Extended integration test for recovery after broker failure*
(cherry picked from commit 8b929e6bf1b1718431abebf04420d7817ad8cdb7)
---
.../SnapshotSegmentAbortedTxnProcessorImpl.java | 66 +++++++++-
.../SegmentAbortedTxnProcessorTest.java | 133 ++++++++++++++++++++-
2 files changed, 196 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index 751c03aff95..4f4e58ac3f5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -47,6 +47,7 @@ import
org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.Refe
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex;
import
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
import
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata;
@@ -265,7 +266,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl
implements AbortedTxnProcess
PositionImpl finalStartReadCursorPosition =
startReadCursorPosition;
TransactionBufferSnapshotIndexes
finalPersistentSnapshotIndexes = persistentSnapshotIndexes;
if (persistentSnapshotIndexes == null) {
- return CompletableFuture.completedFuture(null);
+ return recoverOldSnapshot();
} else {
this.unsealedTxnIds =
convertTypeToTxnID(persistentSnapshotIndexes
.getSnapshot().getAborts());
@@ -378,6 +379,69 @@ public class SnapshotSegmentAbortedTxnProcessorImpl
implements AbortedTxnProcess
.getExecutor(this));
}
+ // This method will be deprecated and removed in version 4.x.0
+ private CompletableFuture<PositionImpl> recoverOldSnapshot() {
+ return
topic.getBrokerService().getTopic(TopicName.get(topic.getName()).getNamespace()
+ "/"
+ + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT, false)
+ .thenCompose(topicOption -> {
+ if (!topicOption.isPresent()) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ return
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
+ .getTxnBufferSnapshotService()
+
.createReader(TopicName.get(topic.getName())).thenComposeAsync(snapshotReader
-> {
+ PositionImpl
startReadCursorPositionInOldSnapshot = null;
+ try {
+ while (snapshotReader.hasMoreEvents())
{
+ Message<TransactionBufferSnapshot>
message = snapshotReader.readNextAsync()
+
.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
+ if
(topic.getName().equals(message.getKey())) {
+ TransactionBufferSnapshot
transactionBufferSnapshot =
+ message.getValue();
+ if (transactionBufferSnapshot
!= null) {
+
handleOldSnapshot(transactionBufferSnapshot);
+
startReadCursorPositionInOldSnapshot = PositionImpl.get(
+
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
+
transactionBufferSnapshot.getMaxReadPositionEntryId());
+ }
+ }
+ }
+ } catch (TimeoutException ex) {
+ Throwable t =
FutureUtil.unwrapCompletionException(ex);
+ String errorMessage =
String.format("[%s] Transaction buffer recover fail by "
+ + "read
transactionBufferSnapshot timeout!", topic.getName());
+ log.error(errorMessage, t);
+ return FutureUtil.failedFuture(new
BrokerServiceException
+
.ServiceUnitNotReadyException(errorMessage, t));
+ } catch (Exception ex) {
+ log.error("[{}] Transaction buffer
recover fail when read "
+ +
"transactionBufferSnapshot!", topic.getName(), ex);
+ return FutureUtil.failedFuture(ex);
+ } finally {
+ assert snapshotReader != null;
+ closeReader(snapshotReader);
+ }
+ return
CompletableFuture.completedFuture(startReadCursorPositionInOldSnapshot);
+ },
+
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
+ .getExecutor(this));
+ }
+ });
+ }
+
+ // This method will be deprecated and removed in version 4.x.0
+ private void handleOldSnapshot(TransactionBufferSnapshot snapshot) {
+ if (snapshot.getAborts() != null) {
+ snapshot.getAborts().forEach(abortTxnMetadata -> {
+ TxnID txnID = new TxnID(abortTxnMetadata.getTxnIdMostBits(),
+ abortTxnMetadata.getTxnIdLeastBits());
+ aborts.put(txnID, txnID);
+ //The old data will be written into the first segment.
+ unsealedTxnIds.add(txnID);
+ });
+ }
+ }
+
@Override
public CompletableFuture<Void> clearAbortedTxnSnapshot() {
return
persistentWorker.appendTask(PersistentWorker.OperationType.Clear,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
index c157d7cf8c5..cb15ab003f7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.transaction;
+import static org.junit.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -44,10 +45,16 @@ import
org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import
org.apache.pulsar.broker.transaction.buffer.impl.SnapshotSegmentAbortedTxnProcessorImpl;
import
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
import
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicStats;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -247,8 +254,8 @@ public class SegmentAbortedTxnProcessorTest extends
TransactionTestBase {
private void verifySnapshotSegmentsSize(String topic, int size) throws
Exception {
SystemTopicClient.Reader<TransactionBufferSnapshotSegment> reader =
pulsarService.getTransactionBufferSnapshotServiceFactory()
- .getTxnBufferSnapshotSegmentService()
- .createReader(TopicName.get(topic)).get();
+ .getTxnBufferSnapshotSegmentService()
+ .createReader(TopicName.get(topic)).get();
int segmentCount = 0;
while (reader.hasMoreEvents()) {
Message<TransactionBufferSnapshotSegment> message =
reader.readNextAsync()
@@ -286,4 +293,126 @@ public class SegmentAbortedTxnProcessorTest extends
TransactionTestBase {
CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>)
field.get(snapshotTopic);
org.awaitility.Awaitility.await().untilAsserted(() ->
assertTrue(compactionFuture.isDone()));
}
+
+ /**
+ * This test verifies the compatibility of the transaction buffer
segmented snapshot feature
+ * when enabled on an existing topic.
+ * It performs the following steps:
+ * 1. Creates a topic with segmented snapshot disabled.
+ * 2. Sends 10 messages without using transactions.
+ * 3. Sends 10 messages using transactions and aborts them.
+ * 4. Sends 10 messages without using transactions.
+ * 5. Verifies that only the non-transactional messages are received.
+ * 6. Enables the segmented snapshot feature and sets the snapshot segment
size.
+ * 7. Unloads the topic.
+ * 8. Sends a new message using a transaction and aborts it.
+ * 9. Verifies that the topic has exactly one segment.
+ * 10. Re-subscribes the consumer and re-verifies that only the
non-transactional messages are received.
+ */
+ @Test
+ public void testSnapshotProcessorUpgrade() throws Exception {
+ this.pulsarService = getPulsarServiceList().get(0);
+
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
+
+ // Create a topic, send 10 messages without using transactions, and
send 10 messages using transactions.
+ // Abort these transactions and verify the data.
+ final String topicName = "persistent://" + NAMESPACE1 +
"/testSnapshotProcessorUpgrade";
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
+
+ // Send 10 messages without using transactions
+ for (int i = 0; i < 10; i++) {
+ producer.send(("test-message-" + i).getBytes());
+ }
+
+ // Send 10 messages using transactions and abort them
+ for (int i = 0; i < 10; i++) {
+ Transaction txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build().get();
+ producer.newMessage(txn).value(("test-txn-message-" +
i).getBytes()).sendAsync();
+ txn.abort().get();
+ }
+
+ // Send 10 messages without using transactions
+ for (int i = 10; i < 20; i++) {
+ producer.send(("test-message-" + i).getBytes());
+ }
+
+ // Verify the data
+ for (int i = 0; i < 20; i++) {
+ Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+ assertEquals("test-message-" + i, new String(msg.getData()));
+ }
+
+ // Enable segmented snapshot
+
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
+
this.pulsarService.getConfig().setTransactionBufferSnapshotSegmentSize(8 +
PROCESSOR_TOPIC.length() +
+ SEGMENT_SIZE * 3);
+
+ // Unload the topic
+ admin.topics().unload(topicName);
+
+ // Sends a new message using a transaction and aborts it.
+ Transaction txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build().get();
+ producer.newMessage(txn).value("test-message-new".getBytes()).send();
+ txn.abort().get();
+
+ // Verifies that the topic has exactly one segment.
+ Awaitility.await().untilAsserted(() -> {
+ String segmentTopic = "persistent://" + NAMESPACE1 + "/" +
+ SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS;
+ TopicStats topicStats = admin.topics().getStats(segmentTopic);
+ assertEquals(1, topicStats.getMsgInCounter());
+ });
+
+ // Re-subscribes the consumer and re-verifies that only the
non-transactional messages are received.
+ consumer.close();
+ consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
+ for (int i = 0; i < 20; i++) {
+ Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+ assertEquals("test-message-" + i, new String(msg.getData()));
+ }
+ }
+
+ /**
+ * This test verifies that when the segmented snapshot feature is enabled,
creating a new topic
+ * does not create a __transaction_buffer_snapshot topic in the same
namespace.
+ * The test performs the following steps:
+ * 1. Enable the segmented snapshot feature.
+ * 2. Create a new namespace.
+ * 3. Create a new topic in the namespace.
+ * 4. Check that the __transaction_buffer_snapshot topic is not created in
the same namespace.
+ * 5. Destroy the namespace after the test.
+ */
+ @Test
+ public void testSegmentedSnapshotWithoutCreatingOldSnapshotTopic() throws
Exception {
+ // Enable the segmented snapshot feature
+ pulsarService = getPulsarServiceList().get(0);
+
pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
+
+ // Create a new namespace
+ String namespaceName =
"tnx/testSegmentedSnapshotWithoutCreatingOldSnapshotTopic";
+ admin.namespaces().createNamespace(namespaceName);
+
+ // Create a new topic in the namespace
+ String topicName = "persistent://" + namespaceName + "/newTopic";
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+ producer.close();
+
+ // Check that the __transaction_buffer_snapshot topic is not created
in the same namespace
+ String transactionBufferSnapshotTopic = "persistent://" +
namespaceName + "/" +
+ SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
+ try {
+ admin.topics().getStats(transactionBufferSnapshotTopic);
+ fail("The __transaction_buffer_snapshot topic should not exist");
+ } catch (PulsarAdminException e) {
+ assertEquals(e.getStatusCode(), 404);
+ }
+
+ // Destroy the namespace after the test
+ admin.namespaces().deleteNamespace(namespaceName, true);
+ }
}