This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 127ea2576d6 [fix][txn] Implement compatibility for transaction buffer 
segmented snapshot feature upgrade (#20235)
127ea2576d6 is described below

commit 127ea2576d65d72b7906921178565b1a3a9e722e
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue May 16 09:41:35 2023 +0800

    [fix][txn] Implement compatibility for transaction buffer segmented 
snapshot feature upgrade (#20235)
    
    master https://github.com/apache/pulsar/issues/16913
    ## Motivation:
    The transaction buffer segmented snapshot feature aims to improve the 
transaction buffer's performance by segmenting the snapshot and managing it 
more efficiently. However, for existing topics that were created before this 
feature was introduced, we need to ensure a seamless transition and 
compatibility when enabling the segmented snapshot feature.
    
    ## Modifications:
    1. Updated the `recoverFromSnapshot()` method to read from another topic if 
the `persistentSnapshotIndexes` is null. This ensures that the appropriate 
snapshot data is fetched during the recovery process when upgrading to the 
segmented snapshot feature.
    2. Created a new test `testSnapshotProcessorUpdate()` that verifies the 
compatibility of the transaction
    
    ### Verifying this change
    
    - [ ] Make sure that the change passes the CI checks.
    
    *(Please pick either of the following options)*
    
    This change is a trivial rework / code cleanup without any test coverage.
    
    *(or)*
    
    This change is already covered by existing tests, such as *(please describe 
tests)*.
    
    *(or)*
    
    This change added tests and can be verified as follows:
    
    *(example:)*
      - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
      - *Extended integration test for recovery after broker failure*
    
    (cherry picked from commit 8b929e6bf1b1718431abebf04420d7817ad8cdb7)
---
 .../SnapshotSegmentAbortedTxnProcessorImpl.java    |  66 +++++++++-
 .../SegmentAbortedTxnProcessorTest.java            | 133 ++++++++++++++++++++-
 2 files changed, 196 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index 751c03aff95..4f4e58ac3f5 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -47,6 +47,7 @@ import 
org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.Refe
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.systopic.SystemTopicClient;
 import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
+import 
org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata;
@@ -265,7 +266,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
                     PositionImpl finalStartReadCursorPosition = 
startReadCursorPosition;
                     TransactionBufferSnapshotIndexes 
finalPersistentSnapshotIndexes = persistentSnapshotIndexes;
                     if (persistentSnapshotIndexes == null) {
-                        return CompletableFuture.completedFuture(null);
+                        return recoverOldSnapshot();
                     } else {
                         this.unsealedTxnIds = 
convertTypeToTxnID(persistentSnapshotIndexes
                                 .getSnapshot().getAborts());
@@ -378,6 +379,69 @@ public class SnapshotSegmentAbortedTxnProcessorImpl 
implements AbortedTxnProcess
                         .getExecutor(this));
     }
 
+    // This method will be deprecated and removed in version 4.x.0
+    private CompletableFuture<PositionImpl> recoverOldSnapshot() {
+        return 
topic.getBrokerService().getTopic(TopicName.get(topic.getName()).getNamespace() 
+ "/"
+                        + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT, false)
+                .thenCompose(topicOption -> {
+                    if (!topicOption.isPresent()) {
+                        return CompletableFuture.completedFuture(null);
+                    } else {
+                        return 
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
+                                .getTxnBufferSnapshotService()
+                                
.createReader(TopicName.get(topic.getName())).thenComposeAsync(snapshotReader 
-> {
+                                    PositionImpl 
startReadCursorPositionInOldSnapshot = null;
+                                    try {
+                                        while (snapshotReader.hasMoreEvents()) 
{
+                                            Message<TransactionBufferSnapshot> 
message = snapshotReader.readNextAsync()
+                                                    
.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
+                                            if 
(topic.getName().equals(message.getKey())) {
+                                                TransactionBufferSnapshot 
transactionBufferSnapshot =
+                                                        message.getValue();
+                                                if (transactionBufferSnapshot 
!= null) {
+                                                    
handleOldSnapshot(transactionBufferSnapshot);
+                                                    
startReadCursorPositionInOldSnapshot = PositionImpl.get(
+                                                            
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
+                                                            
transactionBufferSnapshot.getMaxReadPositionEntryId());
+                                                }
+                                            }
+                                        }
+                                    } catch (TimeoutException ex) {
+                                        Throwable t = 
FutureUtil.unwrapCompletionException(ex);
+                                        String errorMessage = 
String.format("[%s] Transaction buffer recover fail by "
+                                                + "read 
transactionBufferSnapshot timeout!", topic.getName());
+                                        log.error(errorMessage, t);
+                                        return FutureUtil.failedFuture(new 
BrokerServiceException
+                                                
.ServiceUnitNotReadyException(errorMessage, t));
+                                    } catch (Exception ex) {
+                                        log.error("[{}] Transaction buffer 
recover fail when read "
+                                                + 
"transactionBufferSnapshot!", topic.getName(), ex);
+                                        return FutureUtil.failedFuture(ex);
+                                    } finally {
+                                        assert snapshotReader != null;
+                                        closeReader(snapshotReader);
+                                    }
+                                    return 
CompletableFuture.completedFuture(startReadCursorPositionInOldSnapshot);
+                                },
+                                        
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
+                                                .getExecutor(this));
+                    }
+                });
+    }
+
+    // This method will be deprecated and removed in version 4.x.0
+    private void handleOldSnapshot(TransactionBufferSnapshot snapshot) {
+        if (snapshot.getAborts() != null) {
+            snapshot.getAborts().forEach(abortTxnMetadata -> {
+                TxnID txnID = new TxnID(abortTxnMetadata.getTxnIdMostBits(),
+                        abortTxnMetadata.getTxnIdLeastBits());
+                aborts.put(txnID, txnID);
+                //The old data will be written into the first segment.
+                unsealedTxnIds.add(txnID);
+            });
+        }
+    }
+
     @Override
     public CompletableFuture<Void> clearAbortedTxnSnapshot() {
         return 
persistentWorker.appendTask(PersistentWorker.OperationType.Clear,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
index c157d7cf8c5..cb15ab003f7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.transaction;
 
+import static org.junit.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -44,10 +45,16 @@ import 
org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
 import 
org.apache.pulsar.broker.transaction.buffer.impl.SnapshotSegmentAbortedTxnProcessorImpl;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
 import 
org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -247,8 +254,8 @@ public class SegmentAbortedTxnProcessorTest extends 
TransactionTestBase {
     private void verifySnapshotSegmentsSize(String topic, int size) throws 
Exception {
         SystemTopicClient.Reader<TransactionBufferSnapshotSegment> reader =
                 pulsarService.getTransactionBufferSnapshotServiceFactory()
-                .getTxnBufferSnapshotSegmentService()
-                .createReader(TopicName.get(topic)).get();
+                        .getTxnBufferSnapshotSegmentService()
+                        .createReader(TopicName.get(topic)).get();
         int segmentCount = 0;
         while (reader.hasMoreEvents()) {
             Message<TransactionBufferSnapshotSegment> message = 
reader.readNextAsync()
@@ -286,4 +293,126 @@ public class SegmentAbortedTxnProcessorTest extends 
TransactionTestBase {
         CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>) 
field.get(snapshotTopic);
         org.awaitility.Awaitility.await().untilAsserted(() -> 
assertTrue(compactionFuture.isDone()));
     }
+
+    /**
+     * This test verifies the compatibility of the transaction buffer 
segmented snapshot feature
+     * when enabled on an existing topic.
+     * It performs the following steps:
+     * 1. Creates a topic with segmented snapshot disabled.
+     * 2. Sends 10 messages without using transactions.
+     * 3. Sends 10 messages using transactions and aborts them.
+     * 4. Sends 10 messages without using transactions.
+     * 5. Verifies that only the non-transactional messages are received.
+     * 6. Enables the segmented snapshot feature and sets the snapshot segment 
size.
+     * 7. Unloads the topic.
+     * 8. Sends a new message using a transaction and aborts it.
+     * 9. Verifies that the topic has exactly one segment.
+     * 10. Re-subscribes the consumer and re-verifies that only the 
non-transactional messages are received.
+     */
+    @Test
+    public void testSnapshotProcessorUpgrade() throws Exception {
+        this.pulsarService = getPulsarServiceList().get(0);
+        
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
+
+        // Create a topic, send 10 messages without using transactions, and 
send 10 messages using transactions.
+        // Abort these transactions and verify the data.
+        final String topicName = "persistent://" + NAMESPACE1 + 
"/testSnapshotProcessorUpgrade";
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
+
+        // Send 10 messages without using transactions
+        for (int i = 0; i < 10; i++) {
+            producer.send(("test-message-" + i).getBytes());
+        }
+
+        // Send 10 messages using transactions and abort them
+        for (int i = 0; i < 10; i++) {
+            Transaction txn = pulsarClient.newTransaction()
+                    .withTransactionTimeout(5, TimeUnit.SECONDS)
+                    .build().get();
+            producer.newMessage(txn).value(("test-txn-message-" + 
i).getBytes()).sendAsync();
+            txn.abort().get();
+        }
+
+        // Send 10 messages without using transactions
+        for (int i = 10; i < 20; i++) {
+            producer.send(("test-message-" + i).getBytes());
+        }
+
+        // Verify the data
+        for (int i = 0; i < 20; i++) {
+            Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+            assertEquals("test-message-" + i, new String(msg.getData()));
+        }
+
+        // Enable segmented snapshot
+        
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
+        
this.pulsarService.getConfig().setTransactionBufferSnapshotSegmentSize(8 + 
PROCESSOR_TOPIC.length() +
+                SEGMENT_SIZE * 3);
+
+        // Unload the topic
+        admin.topics().unload(topicName);
+
+        // Sends a new message using a transaction and aborts it.
+        Transaction txn = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build().get();
+        producer.newMessage(txn).value("test-message-new".getBytes()).send();
+        txn.abort().get();
+
+        // Verifies that the topic has exactly one segment.
+        Awaitility.await().untilAsserted(() -> {
+            String segmentTopic = "persistent://" + NAMESPACE1 + "/" +
+                    SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS;
+            TopicStats topicStats = admin.topics().getStats(segmentTopic);
+            assertEquals(1, topicStats.getMsgInCounter());
+        });
+
+        // Re-subscribes the consumer and re-verifies that only the 
non-transactional messages are received.
+        consumer.close();
+        consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
+        for (int i = 0; i < 20; i++) {
+            Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+            assertEquals("test-message-" + i, new String(msg.getData()));
+        }
+    }
+
+    /**
+     * This test verifies that when the segmented snapshot feature is enabled, 
creating a new topic
+     * does not create a __transaction_buffer_snapshot topic in the same 
namespace.
+     * The test performs the following steps:
+     * 1. Enable the segmented snapshot feature.
+     * 2. Create a new namespace.
+     * 3. Create a new topic in the namespace.
+     * 4. Check that the __transaction_buffer_snapshot topic is not created in 
the same namespace.
+     * 5. Destroy the namespace after the test.
+     */
+    @Test
+    public void testSegmentedSnapshotWithoutCreatingOldSnapshotTopic() throws 
Exception {
+        // Enable the segmented snapshot feature
+        pulsarService = getPulsarServiceList().get(0);
+        
pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
+
+        // Create a new namespace
+        String namespaceName = 
"tnx/testSegmentedSnapshotWithoutCreatingOldSnapshotTopic";
+        admin.namespaces().createNamespace(namespaceName);
+
+        // Create a new topic in the namespace
+        String topicName = "persistent://" + namespaceName + "/newTopic";
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        producer.close();
+
+        // Check that the __transaction_buffer_snapshot topic is not created 
in the same namespace
+        String transactionBufferSnapshotTopic = "persistent://" + 
namespaceName + "/" +
+                SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT;
+        try {
+            admin.topics().getStats(transactionBufferSnapshotTopic);
+            fail("The __transaction_buffer_snapshot topic should not exist");
+        } catch (PulsarAdminException e) {
+            assertEquals(e.getStatusCode(), 404);
+        }
+
+        // Destroy the namespace after the test
+        admin.namespaces().deleteNamespace(namespaceName, true);
+    }
 }

Reply via email to