congbobo184 commented on code in PR #20235:
URL: https://github.com/apache/pulsar/pull/20235#discussion_r1186612138
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java:
##########
@@ -378,6 +379,59 @@ public void
openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Ob
.getExecutor(this));
}
+ // This method will be deprecated and removed in version 4.x.0
+ private CompletableFuture<PositionImpl> recoverOldSnapshot() {
+ return
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
+ .getTxnBufferSnapshotService()
+
.createReader(TopicName.get(topic.getName())).thenComposeAsync(snapshotReader
-> {
+ PositionImpl startReadCursorPositionInOldSnapshot = null;
+ try {
+ while (snapshotReader.hasMoreEvents()) {
+ Message<TransactionBufferSnapshot> message =
snapshotReader.readNextAsync()
+ .get(getSystemClientOperationTimeoutMs(),
TimeUnit.MILLISECONDS);
+ if (topic.getName().equals(message.getKey())) {
+ TransactionBufferSnapshot
transactionBufferSnapshot =
+ message.getValue();
+ if (transactionBufferSnapshot != null) {
+
handleOldSnapshot(transactionBufferSnapshot);
+ startReadCursorPositionInOldSnapshot =
PositionImpl.get(
+
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
+
transactionBufferSnapshot.getMaxReadPositionEntryId());
+ }
+ }
+ }
+ } catch (TimeoutException ex) {
+ Throwable t = FutureUtil.unwrapCompletionException(ex);
+ String errorMessage = String.format("[%s] Transaction
buffer recover fail by "
+ + "read transactionBufferSnapshot timeout!",
topic.getName());
+ log.error(errorMessage, t);
+ return FutureUtil.failedFuture(new
BrokerServiceException
+ .ServiceUnitNotReadyException(errorMessage,
t));
+ } catch (Exception ex) {
+ log.error("[{}] Transaction buffer recover fail when
read "
+ + "transactionBufferSnapshot!",
topic.getName(), ex);
+ return FutureUtil.failedFuture(ex);
+ } finally {
+ assert snapshotReader != null;
+ closeReader(snapshotReader);
+ }
+ return
CompletableFuture.completedFuture(startReadCursorPositionInOldSnapshot);
+ });
Review Comment:
```suggestion
},
topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
.getExecutor(this));
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java:
##########
@@ -286,4 +292,79 @@ private void doCompaction(TopicName topic) throws
Exception {
CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>)
field.get(snapshotTopic);
org.awaitility.Awaitility.await().untilAsserted(() ->
assertTrue(compactionFuture.isDone()));
}
+
+ /**
+ * This test verifies the compatibility of the transaction buffer
segmented snapshot feature
+ * when enabled on an existing topic.
+ * It performs the following steps:
+ * 1. Creates a topic with segmented snapshot disabled.
+ * 2. Sends 10 messages without using transactions.
+ * 3. Sends 10 messages using transactions and aborts them.
+ * 4. Verifies that only the non-transactional messages are received.
+ * 5. Enables the segmented snapshot feature and sets the snapshot segment
size.
+ * 6. Unloads the topic and re-verifies that only the non-transactional
messages are received.
+ * 7. Sends a new message, and checks if the topic has exactly one segment.
+ */
+ @Test
+ public void testSnapshotProcessorUpdate() throws Exception {
Review Comment:
```suggestion
public void testSnapshotProcessorUpgrade() throws Exception {
```
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java:
##########
@@ -265,7 +266,7 @@ public CompletableFuture<PositionImpl>
recoverFromSnapshot() {
PositionImpl finalStartReadCursorPosition =
startReadCursorPosition;
TransactionBufferSnapshotIndexes
finalPersistentSnapshotIndexes = persistentSnapshotIndexes;
if (persistentSnapshotIndexes == null) {
- return CompletableFuture.completedFuture(null);
+ return recoverOldSnapshot();
Review Comment:
```suggestion
return recoverOldSnapshot();
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java:
##########
@@ -286,4 +292,79 @@ private void doCompaction(TopicName topic) throws
Exception {
CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>)
field.get(snapshotTopic);
org.awaitility.Awaitility.await().untilAsserted(() ->
assertTrue(compactionFuture.isDone()));
}
+
+ /**
+ * This test verifies the compatibility of the transaction buffer
segmented snapshot feature
+ * when enabled on an existing topic.
+ * It performs the following steps:
+ * 1. Creates a topic with segmented snapshot disabled.
+ * 2. Sends 10 messages without using transactions.
+ * 3. Sends 10 messages using transactions and aborts them.
+ * 4. Verifies that only the non-transactional messages are received.
+ * 5. Enables the segmented snapshot feature and sets the snapshot segment
size.
+ * 6. Unloads the topic and re-verifies that only the non-transactional
messages are received.
+ * 7. Sends a new message, and checks if the topic has exactly one segment.
+ */
+ @Test
+ public void testSnapshotProcessorUpdate() throws Exception {
+ this.pulsarService = getPulsarServiceList().get(0);
+
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
+
+ // Create a topic, send 10 messages without using transactions, and
send 10 messages using transactions.
+ // Abort these transactions and verify the data.
+ final String topicName = "persistent://" + NAMESPACE1 +
"/testSnapshotProcessorUpdate";
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
+
+ // Send 10 messages without using transactions
+ for (int i = 0; i < 10; i++) {
+ producer.send(("test-message-" + i).getBytes());
+ }
+
+ // Send 10 messages using transactions and abort them
+ for (int i = 0; i < 10; i++) {
+ Transaction txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build().get();
+ producer.newMessage(txn).value(("test-txn-message-" +
i).getBytes()).sendAsync();
+ txn.abort().get();
+ }
+
+ // Verify the data
+ for (int i = 0; i < 10; i++) {
+ Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+ assertEquals("test-message-" + i, new String(msg.getData()));
+ consumer.acknowledge(msg);
Review Comment:
why ack it?
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java:
##########
@@ -286,4 +292,79 @@ private void doCompaction(TopicName topic) throws
Exception {
CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>)
field.get(snapshotTopic);
org.awaitility.Awaitility.await().untilAsserted(() ->
assertTrue(compactionFuture.isDone()));
}
+
+ /**
+ * This test verifies the compatibility of the transaction buffer
segmented snapshot feature
+ * when enabled on an existing topic.
+ * It performs the following steps:
+ * 1. Creates a topic with segmented snapshot disabled.
+ * 2. Sends 10 messages without using transactions.
+ * 3. Sends 10 messages using transactions and aborts them.
+ * 4. Verifies that only the non-transactional messages are received.
+ * 5. Enables the segmented snapshot feature and sets the snapshot segment
size.
+ * 6. Unloads the topic and re-verifies that only the non-transactional
messages are received.
+ * 7. Sends a new message, and checks if the topic has exactly one segment.
+ */
+ @Test
+ public void testSnapshotProcessorUpdate() throws Exception {
+ this.pulsarService = getPulsarServiceList().get(0);
+
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
+
+ // Create a topic, send 10 messages without using transactions, and
send 10 messages using transactions.
+ // Abort these transactions and verify the data.
+ final String topicName = "persistent://" + NAMESPACE1 +
"/testSnapshotProcessorUpdate";
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
+
+ // Send 10 messages without using transactions
+ for (int i = 0; i < 10; i++) {
+ producer.send(("test-message-" + i).getBytes());
+ }
+
+ // Send 10 messages using transactions and abort them
+ for (int i = 0; i < 10; i++) {
+ Transaction txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build().get();
+ producer.newMessage(txn).value(("test-txn-message-" +
i).getBytes()).sendAsync();
+ txn.abort().get();
+ }
+
+ // Verify the data
+ for (int i = 0; i < 10; i++) {
+ Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+ assertEquals("test-message-" + i, new String(msg.getData()));
+ consumer.acknowledge(msg);
+ }
+
+ // Enable segmented snapshot
+
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
+
this.pulsarService.getConfig().setTransactionBufferSnapshotSegmentSize(8 +
PROCESSOR_TOPIC.length() +
+ SEGMENT_SIZE * 3);
+
+ // Unload the topic and re-verify the data
+ admin.topics().unload(topicName);
+ consumer.close();
+ consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
+ for (int i = 0; i < 10; i++) {
Review Comment:
verify can receive 0 - 20
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java:
##########
@@ -286,4 +292,79 @@ private void doCompaction(TopicName topic) throws
Exception {
CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>)
field.get(snapshotTopic);
org.awaitility.Awaitility.await().untilAsserted(() ->
assertTrue(compactionFuture.isDone()));
}
+
+ /**
+ * This test verifies the compatibility of the transaction buffer
segmented snapshot feature
+ * when enabled on an existing topic.
+ * It performs the following steps:
+ * 1. Creates a topic with segmented snapshot disabled.
+ * 2. Sends 10 messages without using transactions.
+ * 3. Sends 10 messages using transactions and aborts them.
+ * 4. Verifies that only the non-transactional messages are received.
+ * 5. Enables the segmented snapshot feature and sets the snapshot segment
size.
+ * 6. Unloads the topic and re-verifies that only the non-transactional
messages are received.
+ * 7. Sends a new message, and checks if the topic has exactly one segment.
+ */
+ @Test
+ public void testSnapshotProcessorUpdate() throws Exception {
+ this.pulsarService = getPulsarServiceList().get(0);
+
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
+
+ // Create a topic, send 10 messages without using transactions, and
send 10 messages using transactions.
+ // Abort these transactions and verify the data.
+ final String topicName = "persistent://" + NAMESPACE1 +
"/testSnapshotProcessorUpdate";
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
+
+ // Send 10 messages without using transactions
+ for (int i = 0; i < 10; i++) {
+ producer.send(("test-message-" + i).getBytes());
+ }
+
+ // Send 10 messages using transactions and abort them
+ for (int i = 0; i < 10; i++) {
+ Transaction txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build().get();
+ producer.newMessage(txn).value(("test-txn-message-" +
i).getBytes()).sendAsync();
+ txn.abort().get();
+ }
+
+ // Verify the data
+ for (int i = 0; i < 10; i++) {
+ Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+ assertEquals("test-message-" + i, new String(msg.getData()));
+ consumer.acknowledge(msg);
+ }
+
+ // Enable segmented snapshot
+
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
+
this.pulsarService.getConfig().setTransactionBufferSnapshotSegmentSize(8 +
PROCESSOR_TOPIC.length() +
+ SEGMENT_SIZE * 3);
+
+ // Unload the topic and re-verify the data
+ admin.topics().unload(topicName);
+ consumer.close();
+ consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
+ for (int i = 0; i < 10; i++) {
+ Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
+ assertEquals("test-message-" + i, new String(msg.getData()));
+ consumer.acknowledge(msg);
+ }
+
+ // Send a message, unload the topic, and verify that the topic has
exactly one segment
+ Transaction txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build().get();
+ producer.newMessage(txn).value("test-message-new".getBytes()).send();
+ txn.abort().get();
+
+ // Check if the topic has only one segment
+ Awaitility.await().untilAsserted(() -> {
+ String segmentTopic = "persistent://" + NAMESPACE1 + "/" +
+ SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS;
+ TopicStats topicStats = admin.topics().getStats(segmentTopic);
+ assertEquals(1, topicStats.getMsgInCounter());
+ });
+ }
Review Comment:
check this before receive
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/SegmentAbortedTxnProcessorTest.java:
##########
@@ -286,4 +292,79 @@ private void doCompaction(TopicName topic) throws
Exception {
CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>)
field.get(snapshotTopic);
org.awaitility.Awaitility.await().untilAsserted(() ->
assertTrue(compactionFuture.isDone()));
}
+
+ /**
+ * This test verifies the compatibility of the transaction buffer
segmented snapshot feature
+ * when enabled on an existing topic.
+ * It performs the following steps:
+ * 1. Creates a topic with segmented snapshot disabled.
+ * 2. Sends 10 messages without using transactions.
+ * 3. Sends 10 messages using transactions and aborts them.
+ * 4. Verifies that only the non-transactional messages are received.
+ * 5. Enables the segmented snapshot feature and sets the snapshot segment
size.
+ * 6. Unloads the topic and re-verifies that only the non-transactional
messages are received.
+ * 7. Sends a new message, and checks if the topic has exactly one segment.
+ */
+ @Test
+ public void testSnapshotProcessorUpdate() throws Exception {
+ this.pulsarService = getPulsarServiceList().get(0);
+
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
+
+ // Create a topic, send 10 messages without using transactions, and
send 10 messages using transactions.
+ // Abort these transactions and verify the data.
+ final String topicName = "persistent://" + NAMESPACE1 +
"/testSnapshotProcessorUpdate";
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName).create();
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
+
+ // Send 10 messages without using transactions
+ for (int i = 0; i < 10; i++) {
+ producer.send(("test-message-" + i).getBytes());
+ }
+
+ // Send 10 messages using transactions and abort them
+ for (int i = 0; i < 10; i++) {
+ Transaction txn = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build().get();
+ producer.newMessage(txn).value(("test-txn-message-" +
i).getBytes()).sendAsync();
+ txn.abort().get();
+ }
+
Review Comment:
send 10 - 20, verify can receive 0 - 20
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]