congbobo184 commented on code in PR #20235:
URL: https://github.com/apache/pulsar/pull/20235#discussion_r1186612138


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java:
##########
@@ -378,6 +379,59 @@ public void 
openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Ob
                         .getExecutor(this));
     }
 
+    // This method will be deprecated and removed in version 4.x.0
+    private CompletableFuture<PositionImpl> recoverOldSnapshot() {
+        return 
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
+                .getTxnBufferSnapshotService()
+                
.createReader(TopicName.get(topic.getName())).thenComposeAsync(snapshotReader 
-> {
+                    PositionImpl startReadCursorPositionInOldSnapshot = null;
+                    try {
+                        while (snapshotReader.hasMoreEvents()) {
+                            Message<TransactionBufferSnapshot> message = 
snapshotReader.readNextAsync()
+                                    .get(getSystemClientOperationTimeoutMs(), 
TimeUnit.MILLISECONDS);
+                            if (topic.getName().equals(message.getKey())) {
+                                TransactionBufferSnapshot 
transactionBufferSnapshot =
+                                        message.getValue();
+                                if (transactionBufferSnapshot != null) {
+                                    
handleOldSnapshot(transactionBufferSnapshot);
+                                    startReadCursorPositionInOldSnapshot = 
PositionImpl.get(
+                                            
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
+                                            
transactionBufferSnapshot.getMaxReadPositionEntryId());
+                                }
+                            }
+                        }
+                    } catch (TimeoutException ex) {
+                        Throwable t = FutureUtil.unwrapCompletionException(ex);
+                        String errorMessage = String.format("[%s] Transaction 
buffer recover fail by "
+                                + "read transactionBufferSnapshot timeout!", 
topic.getName());
+                        log.error(errorMessage, t);
+                        return FutureUtil.failedFuture(new 
BrokerServiceException
+                                .ServiceUnitNotReadyException(errorMessage, 
t));
+                    } catch (Exception ex) {
+                        log.error("[{}] Transaction buffer recover fail when 
read "
+                                + "transactionBufferSnapshot!", 
topic.getName(), ex);
+                        return FutureUtil.failedFuture(ex);
+                    } finally {
+                        assert snapshotReader != null;
+                        closeReader(snapshotReader);
+                    }
+                    return 
CompletableFuture.completedFuture(startReadCursorPositionInOldSnapshot);
+                });

Review Comment:
   ```suggestion
                       },  
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
                           .getExecutor(this));
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java:
##########
@@ -286,4 +292,79 @@ private void doCompaction(TopicName topic) throws 
Exception {
         CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>) 
field.get(snapshotTopic);
         org.awaitility.Awaitility.await().untilAsserted(() -> 
assertTrue(compactionFuture.isDone()));
     }
+
+    /**
+     * This test verifies the compatibility of the transaction buffer 
segmented snapshot feature
+     * when enabled on an existing topic.
+     * It performs the following steps:
+     * 1. Creates a topic with segmented snapshot disabled.
+     * 2. Sends 10 messages without using transactions.
+     * 3. Sends 10 messages using transactions and aborts them.
+     * 4. Verifies that only the non-transactional messages are received.
+     * 5. Enables the segmented snapshot feature and sets the snapshot segment 
size.
+     * 6. Unloads the topic and re-verifies that only the non-transactional 
messages are received.
+     * 7. Sends a new message, and checks if the topic has exactly one segment.
+     */
+    @Test
+    public void testSnapshotProcessorUpdate() throws Exception {

Review Comment:
   ```suggestion
       public void testSnapshotProcessorUpgrade() throws Exception {
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java:
##########
@@ -265,7 +266,7 @@ public CompletableFuture<PositionImpl> 
recoverFromSnapshot() {
                     PositionImpl finalStartReadCursorPosition = 
startReadCursorPosition;
                     TransactionBufferSnapshotIndexes 
finalPersistentSnapshotIndexes = persistentSnapshotIndexes;
                     if (persistentSnapshotIndexes == null) {
-                        return CompletableFuture.completedFuture(null);
+                       return recoverOldSnapshot();

Review Comment:
   ```suggestion
                           return recoverOldSnapshot();
   ```



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java:
##########
@@ -286,4 +292,79 @@ private void doCompaction(TopicName topic) throws 
Exception {
         CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>) 
field.get(snapshotTopic);
         org.awaitility.Awaitility.await().untilAsserted(() -> 
assertTrue(compactionFuture.isDone()));
     }
+
+    /**
+     * This test verifies the compatibility of the transaction buffer 
segmented snapshot feature
+     * when enabled on an existing topic.
+     * It performs the following steps:
+     * 1. Creates a topic with segmented snapshot disabled.
+     * 2. Sends 10 messages without using transactions.
+     * 3. Sends 10 messages using transactions and aborts them.
+     * 4. Verifies that only the non-transactional messages are received.
+     * 5. Enables the segmented snapshot feature and sets the snapshot segment 
size.
+     * 6. Unloads the topic and re-verifies that only the non-transactional 
messages are received.
+     * 7. Sends a new message, and checks if the topic has exactly one segment.
+     */
+    @Test
+    public void testSnapshotProcessorUpdate() throws Exception {
+        this.pulsarService = getPulsarServiceList().get(0);
+        
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
+
+        // Create a topic, send 10 messages without using transactions, and 
send 10 messages using transactions.
+        // Abort these transactions and verify the data.
+        final String topicName = "persistent://" + NAMESPACE1 + 
"/testSnapshotProcessorUpdate";
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
+
+        // Send 10 messages without using transactions
+        for (int i = 0; i < 10; i++) {
+            producer.send(("test-message-" + i).getBytes());
+        }
+
+        // Send 10 messages using transactions and abort them
+        for (int i = 0; i < 10; i++) {
+            Transaction txn = pulsarClient.newTransaction()
+                    .withTransactionTimeout(5, TimeUnit.SECONDS)
+                    .build().get();
+            producer.newMessage(txn).value(("test-txn-message-" + 
i).getBytes()).sendAsync();
+            txn.abort().get();
+        }
+
+        // Verify the data
+        for (int i = 0; i < 10; i++) {
+            Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+            assertEquals("test-message-" + i, new String(msg.getData()));
+            consumer.acknowledge(msg);

Review Comment:
   why ack it?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java:
##########
@@ -286,4 +292,79 @@ private void doCompaction(TopicName topic) throws 
Exception {
         CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>) 
field.get(snapshotTopic);
         org.awaitility.Awaitility.await().untilAsserted(() -> 
assertTrue(compactionFuture.isDone()));
     }
+
+    /**
+     * This test verifies the compatibility of the transaction buffer 
segmented snapshot feature
+     * when enabled on an existing topic.
+     * It performs the following steps:
+     * 1. Creates a topic with segmented snapshot disabled.
+     * 2. Sends 10 messages without using transactions.
+     * 3. Sends 10 messages using transactions and aborts them.
+     * 4. Verifies that only the non-transactional messages are received.
+     * 5. Enables the segmented snapshot feature and sets the snapshot segment 
size.
+     * 6. Unloads the topic and re-verifies that only the non-transactional 
messages are received.
+     * 7. Sends a new message, and checks if the topic has exactly one segment.
+     */
+    @Test
+    public void testSnapshotProcessorUpdate() throws Exception {
+        this.pulsarService = getPulsarServiceList().get(0);
+        
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
+
+        // Create a topic, send 10 messages without using transactions, and 
send 10 messages using transactions.
+        // Abort these transactions and verify the data.
+        final String topicName = "persistent://" + NAMESPACE1 + 
"/testSnapshotProcessorUpdate";
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
+
+        // Send 10 messages without using transactions
+        for (int i = 0; i < 10; i++) {
+            producer.send(("test-message-" + i).getBytes());
+        }
+
+        // Send 10 messages using transactions and abort them
+        for (int i = 0; i < 10; i++) {
+            Transaction txn = pulsarClient.newTransaction()
+                    .withTransactionTimeout(5, TimeUnit.SECONDS)
+                    .build().get();
+            producer.newMessage(txn).value(("test-txn-message-" + 
i).getBytes()).sendAsync();
+            txn.abort().get();
+        }
+
+        // Verify the data
+        for (int i = 0; i < 10; i++) {
+            Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+            assertEquals("test-message-" + i, new String(msg.getData()));
+            consumer.acknowledge(msg);
+        }
+
+        // Enable segmented snapshot
+        
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
+        
this.pulsarService.getConfig().setTransactionBufferSnapshotSegmentSize(8 + 
PROCESSOR_TOPIC.length() +
+                SEGMENT_SIZE * 3);
+
+        // Unload the topic and re-verify the data
+        admin.topics().unload(topicName);
+        consumer.close();
+        consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
+        for (int i = 0; i < 10; i++) {

Review Comment:
    verify can receive 0 - 20



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java:
##########
@@ -286,4 +292,79 @@ private void doCompaction(TopicName topic) throws 
Exception {
         CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>) 
field.get(snapshotTopic);
         org.awaitility.Awaitility.await().untilAsserted(() -> 
assertTrue(compactionFuture.isDone()));
     }
+
+    /**
+     * This test verifies the compatibility of the transaction buffer 
segmented snapshot feature
+     * when enabled on an existing topic.
+     * It performs the following steps:
+     * 1. Creates a topic with segmented snapshot disabled.
+     * 2. Sends 10 messages without using transactions.
+     * 3. Sends 10 messages using transactions and aborts them.
+     * 4. Verifies that only the non-transactional messages are received.
+     * 5. Enables the segmented snapshot feature and sets the snapshot segment 
size.
+     * 6. Unloads the topic and re-verifies that only the non-transactional 
messages are received.
+     * 7. Sends a new message, and checks if the topic has exactly one segment.
+     */
+    @Test
+    public void testSnapshotProcessorUpdate() throws Exception {
+        this.pulsarService = getPulsarServiceList().get(0);
+        
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
+
+        // Create a topic, send 10 messages without using transactions, and 
send 10 messages using transactions.
+        // Abort these transactions and verify the data.
+        final String topicName = "persistent://" + NAMESPACE1 + 
"/testSnapshotProcessorUpdate";
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
+
+        // Send 10 messages without using transactions
+        for (int i = 0; i < 10; i++) {
+            producer.send(("test-message-" + i).getBytes());
+        }
+
+        // Send 10 messages using transactions and abort them
+        for (int i = 0; i < 10; i++) {
+            Transaction txn = pulsarClient.newTransaction()
+                    .withTransactionTimeout(5, TimeUnit.SECONDS)
+                    .build().get();
+            producer.newMessage(txn).value(("test-txn-message-" + 
i).getBytes()).sendAsync();
+            txn.abort().get();
+        }
+
+        // Verify the data
+        for (int i = 0; i < 10; i++) {
+            Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+            assertEquals("test-message-" + i, new String(msg.getData()));
+            consumer.acknowledge(msg);
+        }
+
+        // Enable segmented snapshot
+        
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
+        
this.pulsarService.getConfig().setTransactionBufferSnapshotSegmentSize(8 + 
PROCESSOR_TOPIC.length() +
+                SEGMENT_SIZE * 3);
+
+        // Unload the topic and re-verify the data
+        admin.topics().unload(topicName);
+        consumer.close();
+        consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
+        for (int i = 0; i < 10; i++) {
+            Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+            assertEquals("test-message-" + i, new String(msg.getData()));
+            consumer.acknowledge(msg);
+        }
+
+        // Send a message, unload the topic, and verify that the topic has 
exactly one segment
+        Transaction txn = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build().get();
+        producer.newMessage(txn).value("test-message-new".getBytes()).send();
+        txn.abort().get();
+
+        // Check if the topic has only one segment
+        Awaitility.await().untilAsserted(() -> {
+            String segmentTopic = "persistent://" + NAMESPACE1 + "/" +
+                    SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS;
+            TopicStats topicStats = admin.topics().getStats(segmentTopic);
+            assertEquals(1, topicStats.getMsgInCounter());
+        });
+    }

Review Comment:
   check this before receive



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java:
##########
@@ -286,4 +292,79 @@ private void doCompaction(TopicName topic) throws 
Exception {
         CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>) 
field.get(snapshotTopic);
         org.awaitility.Awaitility.await().untilAsserted(() -> 
assertTrue(compactionFuture.isDone()));
     }
+
+    /**
+     * This test verifies the compatibility of the transaction buffer 
segmented snapshot feature
+     * when enabled on an existing topic.
+     * It performs the following steps:
+     * 1. Creates a topic with segmented snapshot disabled.
+     * 2. Sends 10 messages without using transactions.
+     * 3. Sends 10 messages using transactions and aborts them.
+     * 4. Verifies that only the non-transactional messages are received.
+     * 5. Enables the segmented snapshot feature and sets the snapshot segment 
size.
+     * 6. Unloads the topic and re-verifies that only the non-transactional 
messages are received.
+     * 7. Sends a new message, and checks if the topic has exactly one segment.
+     */
+    @Test
+    public void testSnapshotProcessorUpdate() throws Exception {
+        this.pulsarService = getPulsarServiceList().get(0);
+        
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
+
+        // Create a topic, send 10 messages without using transactions, and 
send 10 messages using transactions.
+        // Abort these transactions and verify the data.
+        final String topicName = "persistent://" + NAMESPACE1 + 
"/testSnapshotProcessorUpdate";
+        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topicName).create();
+        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
+
+        // Send 10 messages without using transactions
+        for (int i = 0; i < 10; i++) {
+            producer.send(("test-message-" + i).getBytes());
+        }
+
+        // Send 10 messages using transactions and abort them
+        for (int i = 0; i < 10; i++) {
+            Transaction txn = pulsarClient.newTransaction()
+                    .withTransactionTimeout(5, TimeUnit.SECONDS)
+                    .build().get();
+            producer.newMessage(txn).value(("test-txn-message-" + 
i).getBytes()).sendAsync();
+            txn.abort().get();
+        }
+

Review Comment:
   send 10 - 20, verify can receive 0 - 20



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to