liangyepianzhou commented on code in PR #20235:
URL: https://github.com/apache/pulsar/pull/20235#discussion_r1186666749
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java:
##########
@@ -286,4 +292,79 @@ private void doCompaction(TopicName topic) throws
Exception {
CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>)
field.get(snapshotTopic);
org.awaitility.Awaitility.await().untilAsserted(() ->
assertTrue(compactionFuture.isDone()));
}
+
+ /**
+ * This test verifies the compatibility of the transaction buffer
segmented snapshot feature
+ * when enabled on an existing topic.
+ * It performs the following steps:
+ * 1. Creates a topic with segmented snapshot disabled.
+ * 2. Sends 10 messages without using transactions.
+ * 3. Sends 10 messages using transactions and aborts them.
+ * 4. Verifies that only the non-transactional messages are received.
+ * 5. Enables the segmented snapshot feature and sets the snapshot segment
size.
+ * 6. Unloads the topic and re-verifies that only the non-transactional
messages are received.
+ * 7. Sends a new message, and checks if the topic has exactly one segment.
+ */
+ @Test
+ public void testSnapshotProcessorUpdate() throws Exception {
+ this.pulsarService = getPulsarServiceList().get(0);
+
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
+
+ // Create a topic, send 10 messages without using transactions, and
send 10 messages using transactions.
+ // Abort these transactions and verify the data.
+ final String topicName = "persistent://" + NAMESPACE1 +
"/testSnapshotProcessorUpdate";
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
+
+ // Send 10 messages without using transactions
+ for (int i = 0; i < 10; i++) {
+ producer.send(("test-message-" + i).getBytes());
+ }
+
+ // Send 10 messages using transactions and abort them
+ for (int i = 0; i < 10; i++) {
+ Transaction txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build().get();
+ producer.newMessage(txn).value(("test-txn-message-" +
i).getBytes()).sendAsync();
+ txn.abort().get();
+ }
+
Review Comment:
Sorry, I doesn’t understand this.
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java:
##########
@@ -286,4 +292,79 @@ private void doCompaction(TopicName topic) throws
Exception {
CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>)
field.get(snapshotTopic);
org.awaitility.Awaitility.await().untilAsserted(() ->
assertTrue(compactionFuture.isDone()));
}
+
+ /**
+ * This test verifies the compatibility of the transaction buffer
segmented snapshot feature
+ * when enabled on an existing topic.
+ * It performs the following steps:
+ * 1. Creates a topic with segmented snapshot disabled.
+ * 2. Sends 10 messages without using transactions.
+ * 3. Sends 10 messages using transactions and aborts them.
+ * 4. Verifies that only the non-transactional messages are received.
+ * 5. Enables the segmented snapshot feature and sets the snapshot segment
size.
+ * 6. Unloads the topic and re-verifies that only the non-transactional
messages are received.
+ * 7. Sends a new message, and checks if the topic has exactly one segment.
+ */
+ @Test
+ public void testSnapshotProcessorUpdate() throws Exception {
+ this.pulsarService = getPulsarServiceList().get(0);
+
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
+
+ // Create a topic, send 10 messages without using transactions, and
send 10 messages using transactions.
+ // Abort these transactions and verify the data.
+ final String topicName = "persistent://" + NAMESPACE1 +
"/testSnapshotProcessorUpdate";
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
+
+ // Send 10 messages without using transactions
+ for (int i = 0; i < 10; i++) {
+ producer.send(("test-message-" + i).getBytes());
+ }
+
+ // Send 10 messages using transactions and abort them
+ for (int i = 0; i < 10; i++) {
+ Transaction txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build().get();
+ producer.newMessage(txn).value(("test-txn-message-" +
i).getBytes()).sendAsync();
+ txn.abort().get();
+ }
+
Review Comment:
Sorry, I don’t understand this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]