This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new c8488713a9d [fix][test][branch-3.3] Fix flaky
AdminApiTransactionTest.testGetTransactionBufferInternalStats
c8488713a9d is described below
commit c8488713a9db2a6ff89e4c12d32b0ac18d95c89e
Author: Lari Hotari <[email protected]>
AuthorDate: Wed Oct 29 13:25:17 2025 +0200
[fix][test][branch-3.3] Fix flaky
AdminApiTransactionTest.testGetTransactionBufferInternalStats
- cherry-picked from
https://github.com/apache/pulsar/pull/23231/commits/c9e72c45bdc00525e83979b17433fa1f8e65ef39
---
.../broker/admin/v3/AdminApiTransactionTest.java | 81 ++++++++++++----------
1 file changed, 43 insertions(+), 38 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 5a192d0159a..e301f051da1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -49,8 +49,8 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.TransactionIsolationLevel;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TransactionIsolationLevel;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -625,21 +625,23 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
producer.newMessage(transaction).send();
transaction.abort().get();
- // Get transaction buffer internal stats and verify single snapshot
stats
- TransactionBufferInternalStats stats = admin.transactions()
- .getTransactionBufferInternalStatsAsync(topic2, true).get();
- assertEquals(stats.snapshotType,
AbortedTxnProcessor.SnapshotType.Single.toString());
- assertNotNull(stats.singleSnapshotSystemTopicInternalStats);
-
- // Get managed ledger internal stats for the transaction buffer
snapshot topic
- PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(
- TopicName.get(topic2).getNamespace() + "/" +
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
-
verifyManagedLedgerInternalStats(stats.singleSnapshotSystemTopicInternalStats.managedLedgerInternalStats,
- internalStats);
-
assertTrue(stats.singleSnapshotSystemTopicInternalStats.managedLedgerName
- .contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT));
- assertNull(stats.segmentInternalStats);
- assertNull(stats.segmentIndexInternalStats);
+ Awaitility.await().untilAsserted(() -> {
+ // Get transaction buffer internal stats and verify single
snapshot stats
+ TransactionBufferInternalStats stats = admin.transactions()
+ .getTransactionBufferInternalStatsAsync(topic2,
true).get();
+ assertEquals(stats.snapshotType,
AbortedTxnProcessor.SnapshotType.Single.toString());
+ assertNotNull(stats.singleSnapshotSystemTopicInternalStats);
+
+ // Get managed ledger internal stats for the transaction buffer
snapshot topic
+ PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(
+ TopicName.get(topic2).getNamespace() + "/" +
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
+
verifyManagedLedgerInternalStats(stats.singleSnapshotSystemTopicInternalStats.managedLedgerInternalStats,
+ internalStats);
+
assertTrue(stats.singleSnapshotSystemTopicInternalStats.managedLedgerName
+ .contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT));
+ assertNull(stats.segmentInternalStats);
+ assertNull(stats.segmentIndexInternalStats);
+ });
// Configure segmented snapshot and set segment size
pulsar.getConfig().setTransactionBufferSnapshotSegmentSize(9);
@@ -651,28 +653,31 @@ public class AdminApiTransactionTest extends
MockedPulsarServiceBaseTest {
producer.newMessage(transaction).send();
transaction.abort().get();
- // Get transaction buffer internal stats and verify segmented snapshot
stats
- stats =
admin.transactions().getTransactionBufferInternalStatsAsync(topic3, true).get();
- assertEquals(stats.snapshotType,
AbortedTxnProcessor.SnapshotType.Segment.toString());
- assertNull(stats.singleSnapshotSystemTopicInternalStats);
- assertNotNull(stats.segmentInternalStats);
-
- // Get managed ledger internal stats for the transaction buffer
segments topic
- internalStats = admin.topics().getInternalStats(
- TopicName.get(topic2).getNamespace() + "/" +
- SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
-
verifyManagedLedgerInternalStats(stats.segmentInternalStats.managedLedgerInternalStats,
internalStats);
- assertTrue(stats.segmentInternalStats.managedLedgerName
-
.contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS));
-
- // Get managed ledger internal stats for the transaction buffer
indexes topic
- assertNotNull(stats.segmentIndexInternalStats);
- internalStats = admin.topics().getInternalStats(
- TopicName.get(topic2).getNamespace() + "/" +
- SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
-
verifyManagedLedgerInternalStats(stats.segmentIndexInternalStats.managedLedgerInternalStats,
internalStats);
- assertTrue(stats.segmentIndexInternalStats.managedLedgerName
-
.contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES));
+ Awaitility.await().untilAsserted(() -> {
+ // Get transaction buffer internal stats and verify segmented
snapshot stats
+ TransactionBufferInternalStats stats =
+
admin.transactions().getTransactionBufferInternalStatsAsync(topic3, true).get();
+ assertEquals(stats.snapshotType,
AbortedTxnProcessor.SnapshotType.Segment.toString());
+ assertNull(stats.singleSnapshotSystemTopicInternalStats);
+ assertNotNull(stats.segmentInternalStats);
+
+ // Get managed ledger internal stats for the transaction buffer
segments topic
+ PersistentTopicInternalStats internalStats =
admin.topics().getInternalStats(
+ TopicName.get(topic2).getNamespace() + "/" +
+
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+
verifyManagedLedgerInternalStats(stats.segmentInternalStats.managedLedgerInternalStats,
internalStats);
+ assertTrue(stats.segmentInternalStats.managedLedgerName
+
.contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS));
+
+ // Get managed ledger internal stats for the transaction buffer
indexes topic
+ assertNotNull(stats.segmentIndexInternalStats);
+ internalStats = admin.topics().getInternalStats(
+ TopicName.get(topic2).getNamespace() + "/" +
+
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
+
verifyManagedLedgerInternalStats(stats.segmentIndexInternalStats.managedLedgerInternalStats,
internalStats);
+ assertTrue(stats.segmentIndexInternalStats.managedLedgerName
+
.contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES));
+ });
}