liangyepianzhou commented on code in PR #18273:
URL: https://github.com/apache/pulsar/pull/18273#discussion_r1095347074
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java:
##########
@@ -818,9 +820,98 @@ public void
openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Ob
//verify snapshot
assertEquals(snapshot.getTopicName(), snapshotTopic);
assertEquals(snapshot.getSequenceId(), 2L);
- assertEquals(snapshot.getMaxReadPositionLedgerId(), 2L);
- assertEquals(snapshot.getMaxReadPositionEntryId(), 3L);
+ assertEquals(snapshot.getPersistentPositionLedgerId(), 2L);
+ assertEquals(snapshot.getPersistentPositionEntryId(), 3L);
assertEquals(snapshot.getAborts().toArray()[0], new TxnIDData(1, 1));
}
+ //Verify the snapshotSegmentProcessor end to end
+ @Test
+ public void testSnapshotSegment() throws Exception {
+ String topic = NAMESPACE1 + "/testSnapshotSegment";
+ String subName = "testSnapshotSegment";
+
+ LinkedMap<Transaction, MessageId> ongoingTxns = new LinkedMap<>();
+ LinkedList<MessageId> abortedTxns = new LinkedList<>();
+ // 0. Modify the configurations, enabling the segment snapshot and set
the size of the snapshot segment.
+ int theSizeOfSegment = 10;
+ int theCountOfSnapshotMaxTxnCount = 3;
+
this.getPulsarServiceList().get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
+
this.getPulsarServiceList().get(0).getConfig().setTransactionBufferSnapshotSegmentSize(theSizeOfSegment);
+ this.getPulsarServiceList().get(0).getConfig()
+
.setTransactionBufferSnapshotMaxTransactionCount(theCountOfSnapshotMaxTxnCount);
+ // 1. Build producer and consumer
+ Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
+ .topic(topic)
+ .enableBatching(false)
+ .create();
+
+ Consumer<Integer> consumer = pulsarClient.newConsumer(Schema.INT32)
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscribe();
+
+ // 2. Check the AbortedTxnProcessor workflow 10 times
+ int messageSize = theSizeOfSegment * 4;
+ for (int i = 0; i < 10; i++) {
+ MessageId maxReadMessage = null;
+ int abortedTxnSize = 0;
+ for (int j = 0; j < messageSize; j++) {
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(5,
TimeUnit.MINUTES).build().get();
+ //Half common message and half transaction message.
+ if (j % 2 == 0) {
+ MessageId messageId =
producer.newMessage(transaction).value(i * 10 + j).send();
+ //And the transaction message have a half which are
aborted.
+ if (RandomUtils.nextInt() % 2 == 0) {
+ transaction.abort().get();
+ abortedTxns.add(messageId);
+ abortedTxnSize++;
+ } else {
+ ongoingTxns.put(transaction, messageId);
+ if (maxReadMessage == null) {
+ //The except number of the messages that can be
read
+ maxReadMessage = messageId;
+ }
+ }
+ } else {
+ producer.newMessage().value(i * 10 + j).send();
+ transaction.commit().get();
+ }
+ }
+ // 2.1 Receive all message before the maxReadPosition to verify
the correctness of the max read position.
+ int hasReceived = 0;
+ while (true) {
+ Message<Integer> message = consumer.receive(2,
TimeUnit.SECONDS);
+ if (message != null) {
+
Assert.assertTrue(message.getMessageId().compareTo(maxReadMessage) < 0);
+ hasReceived ++;
+ } else {
+ break;
+ }
+ }
+ //2.2 Commit all ongoing transaction and verify that the consumer
can receive all rest message
+ // expect for aborted txn message.
+ for (Transaction ongoingTxn: ongoingTxns.keySet()) {
+ ongoingTxn.commit().get();
+ }
+ ongoingTxns.clear();
+ for (int k = hasReceived; k < messageSize - abortedTxnSize; k++) {
+ Message<Integer> message = consumer.receive(2,
TimeUnit.SECONDS);
+ assertNotNull(message);
+ assertFalse(abortedTxns.contains(message.getMessageId()));
+ }
+ }
+ // 3. After the topic unload, the consumer can receive all the
messages in the 10 tests
+ // expect for the aborted transaction messages.
Review Comment:
There is a clear snapshot test clearTransactionBufferSnapshotTest in
TopicTransactionBufferRecoverTest.
And trim test in testPutAbortedTxnIntoProcessor.
```java
//3. Delete the ledgers and then verify the date.
Field ledgersField =
ManagedLedgerImpl.class.getDeclaredField("ledgers");
ledgersField.setAccessible(true);
NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>
ledgers =
(NavigableMap<Long,
MLDataFormats.ManagedLedgerInfo.LedgerInfo>)
ledgersField.get(persistentTopic.getManagedLedger());
ledgers.forEach((k, v) -> {
ledgers.remove(k);
});
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]