This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 0daa5aaeeb0 [improve][test] PIP-473: extensive v5 transaction tests on
scalable topics (#25958)
0daa5aaeeb0 is described below
commit 0daa5aaeeb0411c775cd7212a52df1737f7e0d75
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Jun 8 09:12:49 2026 -0700
[improve][test] PIP-473: extensive v5 transaction tests on scalable topics
(#25958)
---
.../client/api/v5/V5TransactionRecoveryTest.java | 219 ++++++++++++
.../client/api/v5/V5TransactionScalableTest.java | 382 +++++++++++++++++++++
.../pulsar/client/api/v5/V5TransactionTest.java | 13 +-
.../transaction/TcMetadataDiscoveryTestBase.java | 10 +-
.../V5ScalableTopicTransactionOnOxiaTest.java | 46 +++
.../V5ScalableTopicTransactionTest.java | 346 +++++++++++++++++++
.../src/test/resources/pulsar-transaction.xml | 2 +
7 files changed, 1006 insertions(+), 12 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionRecoveryTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionRecoveryTest.java
new file mode 100644
index 00000000000..64fd9e88544
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionRecoveryTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import com.google.common.collect.Sets;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Recovery coverage for V5 transactions on scalable topics. Uses a single
restartable broker
+ * (the mock BookKeeper + metadata stores are reused across {@link
#restartBroker()}, so segment
+ * data and the {@code /txn} metadata layout survive a restart). After restart
a segment topic is
+ * reloaded cold, exercising {@code MetadataTransactionBuffer} / {@code
MetadataPendingAckStore}
+ * recovery: committed data must stay visible, aborted data must stay
filtered, and committed
+ * transactional acks must not be redelivered. Also covers the timeout sweep
aborting a dangling
+ * transaction.
+ */
+public class V5TransactionRecoveryTest extends MockedPulsarServiceBaseTest {
+
+ private final String myNamespace = "pulsar/txn-recovery";
+
+ private PulsarClient v5Client;
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ ServiceConfiguration config = getDefaultConf();
+ config.setTransactionCoordinatorEnabled(true);
+ config.setTopicLevelPoliciesEnabled(false);
+ super.internalSetup(config);
+
+ admin.clusters().createCluster("test",
+
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ // SYSTEM_NAMESPACE's tenant IS "pulsar" (pulsar/system), which also
owns myNamespace below.
+
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
+ new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet("test")));
+ admin.namespaces().createNamespace(myNamespace,
Sets.newHashSet("test"));
+
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
+
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
+
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
+ new PartitionedTopicMetadata(1));
+
+ v5Client = newV5Client(Duration.ofMinutes(2));
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ if (v5Client != null) {
+ v5Client.close();
+ v5Client = null;
+ }
+ super.internalCleanup();
+ }
+
+ private PulsarClient newV5Client(Duration txnTimeout) throws Exception {
+ return PulsarClient.builder()
+ .serviceUrl(pulsar.getBrokerServiceUrl())
+
.transactionPolicy(TransactionPolicy.builder().timeout(txnTimeout).build())
+ .build();
+ }
+
+ private String newScalableTopic(int numInitialSegments) throws Exception {
+ String name = "topic://" + myNamespace + "/scalable-" +
UUID.randomUUID().toString().substring(0, 8);
+ admin.scalableTopics().createScalableTopic(name, numInitialSegments);
+ return name;
+ }
+
+ private QueueConsumer<String> subscribe(String topic, String sub) throws
Exception {
+ return v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(sub)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+ }
+
+ /** Restart the broker (segment data + metadata survive) and rebuild the
V5 client. */
+ private void restartBrokerAndReconnect() throws Exception {
+ v5Client.close();
+ restartBroker();
+ v5Client = newV5Client(Duration.ofMinutes(2));
+ }
+
+ @Test
+ public void testCommittedMessagesVisibleAfterBrokerRestart() throws
Exception {
+ // Commit a transaction, then restart so the segment topic is reloaded
cold. The transaction
+ // buffer recovers from the /txn metadata and re-exposes the committed
messages.
+ String topic = newScalableTopic(2);
+ @Cleanup
+ Producer<String> producer =
v5Client.newProducer(Schema.string()).topic(topic).create();
+
+ Transaction txn = v5Client.newTransaction();
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ String v = "v-" + i;
+ producer.newMessage().key("k-" +
i).transaction(txn).value(v).send();
+ sent.add(v);
+ }
+ txn.commit();
+ producer.close();
+
+ restartBrokerAndReconnect();
+
+ @Cleanup
+ QueueConsumer<String> consumer = subscribe(topic, "after-restart-sub");
+ Set<String> got = new HashSet<>();
+ for (int i = 0; i < sent.size(); i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(15));
+ assertNotNull(msg, "committed message #" + i + " must survive
restart");
+ got.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+ assertEquals(got, sent, "every committed message must be visible after
recovery");
+ assertNull(consumer.receive(Duration.ofMillis(500)), "no extra
messages");
+ }
+
+ @Test
+ public void testAbortedMessagesFilteredAfterBrokerRestart() throws
Exception {
+ // Abort a transaction and publish a non-transactional sentinel, then
restart. Recovery must
+ // rebuild the aborted set (from the durable ABORTED header / aborted
records) so the aborted
+ // messages stay filtered and only the sentinel is delivered.
+ String topic = newScalableTopic(1);
+ @Cleanup
+ Producer<String> producer =
v5Client.newProducer(Schema.string()).topic(topic).create();
+
+ Transaction txn = v5Client.newTransaction();
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().transaction(txn).value("aborted-" +
i).send();
+ }
+ txn.abort();
+ producer.newMessage().value("sentinel").send();
+ producer.close();
+
+ restartBrokerAndReconnect();
+
+ @Cleanup
+ QueueConsumer<String> consumer = subscribe(topic,
"after-restart-abort-sub");
+ Message<String> msg = consumer.receive(Duration.ofSeconds(15));
+ assertNotNull(msg, "the non-transactional sentinel must be delivered
after restart");
+ assertEquals(msg.value(), "sentinel", "aborted messages must stay
filtered after recovery");
+ consumer.acknowledge(msg.id());
+ assertNull(consumer.receive(Duration.ofMillis(500)), "no aborted
messages after the sentinel");
+ }
+
+ @Test
+ public void testTransactionalAcksSurviveBrokerRestart() throws Exception {
+ // Acknowledge messages inside a committed transaction, then restart.
The pending-ack store
+ // recovery (and the durable cursor) must keep them acknowledged — no
redelivery.
+ String topic = newScalableTopic(1);
+ @Cleanup
+ Producer<String> producer =
v5Client.newProducer(Schema.string()).topic(topic).create();
+ int n = 10;
+ for (int i = 0; i < n; i++) {
+ producer.newMessage().value("m-" + i).send();
+ }
+
+ // Acknowledge all inside the transaction, keeping the consumer open
through commit so the
+ // async ack operations complete before commit (closing it early fails
them).
+ Transaction txn = v5Client.newTransaction();
+ QueueConsumer<String> consumer = subscribe(topic, "ack-sub");
+ for (int i = 0; i < n; i++) {
+ Message<String> m = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(m, "delivery #" + i);
+ consumer.acknowledge(m.id(), txn);
+ }
+ txn.commit();
+ consumer.close();
+
+ // Pre-restart: confirm the committed acks have materialised (a fresh
consumer on the same
+ // subscription sees nothing), so the restart genuinely exercises
recovery of acked state.
+ {
+ @Cleanup
+ QueueConsumer<String> check = subscribe(topic, "ack-sub");
+ assertNull(check.receive(Duration.ofSeconds(10)), "committed acks
must materialise before restart");
+ }
+
+ restartBrokerAndReconnect();
+
+ @Cleanup
+ QueueConsumer<String> after = subscribe(topic, "ack-sub");
+ assertNull(after.receive(Duration.ofSeconds(10)),
+ "committed transactional acks must not be redelivered after
restart");
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionScalableTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionScalableTest.java
new file mode 100644
index 00000000000..bf8ef8e84eb
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionScalableTest.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.policies.data.ScalableTopicMetadata;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Extended coverage for V5 transactions on scalable topics: multi-segment
producers, many
+ * concurrent transactions with mixed commit/abort, abort across layout
changes (split / merge),
+ * large transactions, transactional acknowledgement lifecycle,
per-subscription isolation, and
+ * read-committed visibility while a transaction is still open.
+ *
+ * <p>Complements {@link V5TransactionTest} (single-segment happy paths +
commit-across-split/merge).
+ */
+public class V5TransactionScalableTest extends V5ClientBaseTest {
+
+ private PulsarClient newTxnClient() throws Exception {
+ return track(PulsarClient.builder()
+ .serviceUrl(getBrokerServiceUrl())
+
.transactionPolicy(TransactionPolicy.builder().timeout(Duration.ofMinutes(2)).build())
+ .build());
+ }
+
+ private QueueConsumer<String> subscribe(PulsarClient client, String topic,
String sub) throws Exception {
+ return client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(sub)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+ }
+
+ /** Receive exactly {@code count} messages (acking each), failing if any
is missing. */
+ private Set<String> receiveValues(QueueConsumer<String> consumer, int
count, Duration perMessage)
+ throws Exception {
+ Set<String> values = new HashSet<>();
+ for (int i = 0; i < count; i++) {
+ Message<String> msg = consumer.receive(perMessage);
+ assertNotNull(msg, "missing message #" + i + " (received so far: "
+ values.size() + ")");
+ values.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+ return values;
+ }
+
+ private long firstActiveSegment(String topic) throws Exception {
+ ScalableTopicMetadata meta = admin.scalableTopics().getMetadata(topic);
+ for (ScalableTopicMetadata.SegmentInfo seg :
meta.getSegments().values()) {
+ if (seg.isActive()) {
+ return seg.getSegmentId();
+ }
+ }
+ throw new IllegalStateException("no active segment for " + topic);
+ }
+
+ private List<Long> activeSegments(String topic) throws Exception {
+ ScalableTopicMetadata meta = admin.scalableTopics().getMetadata(topic);
+ List<Long> ids = new ArrayList<>();
+ for (ScalableTopicMetadata.SegmentInfo seg :
meta.getSegments().values()) {
+ if (seg.isActive()) {
+ ids.add(seg.getSegmentId());
+ }
+ }
+ return ids;
+ }
+
+ private void awaitActiveSegmentCount(String topic, int expected) {
+ Awaitility.await().untilAsserted(() ->
assertEquals(activeSegments(topic).size(), expected));
+ }
+
+ @Test
+ public void testMultiSegmentTransactionCommit() throws Exception {
+ // A single transaction whose keyed writes spread across all segments
of a multi-segment
+ // topic commits atomically: every message becomes visible together.
+ PulsarClient client = newTxnClient();
+ String topic = newScalableTopic(3);
+
+ @Cleanup
+ Producer<String> producer =
client.newProducer(Schema.string()).topic(topic).create();
+ @Cleanup
+ QueueConsumer<String> consumer = subscribe(client, topic,
"multi-seg-sub");
+
+ Transaction txn = client.newTransaction();
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < 60; i++) {
+ String v = "v-" + i;
+ producer.newMessage().key("k-" +
i).transaction(txn).value(v).send();
+ sent.add(v);
+ }
+ assertNull(consumer.receive(Duration.ofMillis(500)), "nothing visible
before commit");
+
+ txn.commit();
+ assertEquals(receiveValues(consumer, sent.size(),
Duration.ofSeconds(10)), sent,
+ "every committed message across all segments must be
delivered");
+ assertNull(consumer.receive(Duration.ofMillis(500)), "no extra
messages");
+ }
+
+ @Test
+ public void testConcurrentTransactionsMixedCommitAbort() throws Exception {
+ // Many open transactions on one topic, interleaved; half commit, half
abort. Only the
+ // committed transactions' messages are ever delivered. While any
transaction stays open the
+ // buffer pins visibility, so nothing is visible until all resolve.
+ PulsarClient client = newTxnClient();
+ String topic = newScalableTopic(2);
+
+ @Cleanup
+ Producer<String> producer =
client.newProducer(Schema.string()).topic(topic).create();
+ @Cleanup
+ QueueConsumer<String> consumer = subscribe(client, topic,
"concurrent-sub");
+
+ int numTxns = 6;
+ int perTxn = 10;
+ List<Transaction> txns = new ArrayList<>();
+ for (int t = 0; t < numTxns; t++) {
+ txns.add(client.newTransaction());
+ }
+ Set<String> committedExpected = new HashSet<>();
+ for (int i = 0; i < perTxn; i++) {
+ for (int t = 0; t < numTxns; t++) {
+ String v = "t" + t + "-m" + i;
+ producer.newMessage().key("t" +
t).transaction(txns.get(t)).value(v).send();
+ if (t % 2 == 0) {
+ committedExpected.add(v);
+ }
+ }
+ }
+ assertNull(consumer.receive(Duration.ofMillis(500)), "nothing visible
while transactions are open");
+
+ for (int t = 0; t < numTxns; t++) {
+ if (t % 2 == 0) {
+ txns.get(t).commit();
+ } else {
+ txns.get(t).abort();
+ }
+ }
+
+ assertEquals(receiveValues(consumer, committedExpected.size(),
Duration.ofSeconds(10)),
+ committedExpected, "exactly the committed transactions'
messages must be delivered");
+ assertNull(consumer.receive(Duration.ofSeconds(1)), "aborted
transactions' messages must never appear");
+ }
+
+ @Test
+ public void testAbortSpansSplit() throws Exception {
+ // A transaction spanning a split that aborts must leave nothing
visible from either the
+ // sealed parent or the new children. A non-transactional sentinel
proves the consumer is
+ // live and that only it is delivered.
+ PulsarClient client = newTxnClient();
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer =
client.newProducer(Schema.string()).topic(topic).create();
+ @Cleanup
+ QueueConsumer<String> consumer = subscribe(client, topic,
"abort-split-sub");
+
+ Transaction txn = client.newTransaction();
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().key("k-" +
i).transaction(txn).value("before-split-" + i).send();
+ }
+ admin.scalableTopics().splitSegment(topic, firstActiveSegment(topic));
+ awaitActiveSegmentCount(topic, 2);
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().key("k-after-" +
i).transaction(txn).value("after-split-" + i).send();
+ }
+ txn.abort();
+
+ producer.newMessage().value("sentinel").send();
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "non-transactional sentinel must be delivered");
+ assertEquals(msg.value(), "sentinel", "aborted txn messages must not
precede the sentinel");
+ consumer.acknowledge(msg.id());
+ assertNull(consumer.receive(Duration.ofMillis(500)), "no aborted
messages after the sentinel");
+ }
+
+ @Test
+ public void testAbortSpansMerge() throws Exception {
+ // Same as the split case but across a merge: an aborted transaction
whose lifetime spans a
+ // merge leaves nothing visible.
+ PulsarClient client = newTxnClient();
+ String topic = newScalableTopic(2);
+
+ @Cleanup
+ Producer<String> producer =
client.newProducer(Schema.string()).topic(topic).create();
+ @Cleanup
+ QueueConsumer<String> consumer = subscribe(client, topic,
"abort-merge-sub");
+
+ Transaction txn = client.newTransaction();
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage().key("k-" +
i).transaction(txn).value("before-merge-" + i).send();
+ }
+ List<Long> active = activeSegments(topic);
+ assertEquals(active.size(), 2, "expected 2 active segments before
merge");
+ admin.scalableTopics().mergeSegments(topic, active.get(0),
active.get(1));
+ awaitActiveSegmentCount(topic, 1);
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().key("k-after-" +
i).transaction(txn).value("after-merge-" + i).send();
+ }
+ txn.abort();
+
+ producer.newMessage().value("sentinel").send();
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "non-transactional sentinel must be delivered");
+ assertEquals(msg.value(), "sentinel");
+ consumer.acknowledge(msg.id());
+ assertNull(consumer.receive(Duration.ofMillis(500)), "no aborted
messages after the sentinel");
+ }
+
+ @Test
+ public void testLargeTransaction() throws Exception {
+ // A large transaction commits atomically and every message is
delivered exactly once.
+ PulsarClient client = newTxnClient();
+ String topic = newScalableTopic(2);
+
+ @Cleanup
+ Producer<String> producer =
client.newProducer(Schema.string()).topic(topic).create();
+ @Cleanup
+ QueueConsumer<String> consumer = subscribe(client, topic,
"large-txn-sub");
+
+ int n = 500;
+ Transaction txn = client.newTransaction();
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ String v = "v-" + i;
+ producer.newMessage().key("k-" + (i %
50)).transaction(txn).value(v).send();
+ sent.add(v);
+ }
+ txn.commit();
+
+ assertEquals(receiveValues(consumer, n, Duration.ofSeconds(15)), sent,
+ "all messages in the large transaction must be delivered");
+ assertNull(consumer.receive(Duration.ofSeconds(1)), "no extra
messages");
+ }
+
+ @Test
+ public void testTransactionalAckAbortRedeliversThenCommitSticks() throws
Exception {
+ // Acknowledge messages inside a transaction that aborts -> the acks
roll back and the
+ // messages are redelivered. Acknowledging the redelivered batch in a
committed transaction
+ // makes the acks durable -> no further redelivery.
+ PulsarClient client = newTxnClient();
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer =
client.newProducer(Schema.string()).topic(topic).create();
+ int n = 10;
+ Set<String> produced = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ String v = "m-" + i;
+ producer.newMessage().value(v).send();
+ produced.add(v);
+ }
+
+ @Cleanup
+ QueueConsumer<String> consumer = subscribe(client, topic,
"ack-lifecycle-sub");
+
+ // Consume all, ack inside a transaction we then abort.
+ List<Message<String>> first = new ArrayList<>();
+ for (int i = 0; i < n; i++) {
+ Message<String> m = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(m, "initial delivery #" + i);
+ first.add(m);
+ }
+ Transaction abortTxn = client.newTransaction();
+ for (Message<String> m : first) {
+ consumer.acknowledge(m.id(), abortTxn);
+ }
+ abortTxn.abort();
+
+ // After abort the messages must be redelivered. Collect them in a
committed transaction.
+ Transaction commitTxn = client.newTransaction();
+ Set<String> redelivered = new HashSet<>();
+ long deadline = System.currentTimeMillis() + 30_000;
+ while (redelivered.size() < n && System.currentTimeMillis() <
deadline) {
+ Message<String> m = consumer.receive(Duration.ofSeconds(2));
+ if (m == null) {
+ continue;
+ }
+ redelivered.add(m.value());
+ consumer.acknowledge(m.id(), commitTxn);
+ }
+ assertEquals(redelivered, produced, "aborting a transactional ack must
redeliver every message");
+ commitTxn.commit();
+
+ assertNull(consumer.receive(Duration.ofSeconds(2)),
+ "messages acked in a committed transaction must not be
redelivered");
+ }
+
+ @Test
+ public void testMultipleSubscriptionsIndependentTransactionalAcks() throws
Exception {
+ // A committed transactional ack on one subscription must not affect
another subscription on
+ // the same scalable topic.
+ PulsarClient client = newTxnClient();
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer =
client.newProducer(Schema.string()).topic(topic).create();
+ int n = 10;
+ Set<String> produced = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ String v = "m-" + i;
+ producer.newMessage().value(v).send();
+ produced.add(v);
+ }
+
+ @Cleanup
+ QueueConsumer<String> subA = subscribe(client, topic, "sub-a");
+ @Cleanup
+ QueueConsumer<String> subB = subscribe(client, topic, "sub-b");
+
+ Transaction txn = client.newTransaction();
+ for (int i = 0; i < n; i++) {
+ Message<String> m = subA.receive(Duration.ofSeconds(5));
+ assertNotNull(m, "sub-a delivery #" + i);
+ subA.acknowledge(m.id(), txn);
+ }
+ txn.commit();
+ assertNull(subA.receive(Duration.ofSeconds(2)), "sub-a committed acks
must stick");
+
+ // sub-b is independent and must still see everything.
+ assertEquals(receiveValues(subB, n, Duration.ofSeconds(5)), produced,
+ "the other subscription must be unaffected by sub-a's
transactional acks");
+ }
+
+ @Test
+ public void testOpenTransactionPinsLaterNonTransactionalWrites() throws
Exception {
+ // Read-committed visibility: while a transaction is open, the
buffer's max-read position is
+ // pinned below its first write, so even a non-transactional message
published afterwards
+ // stays invisible until the transaction resolves. After abort, the
non-transactional message
+ // becomes visible and the aborted ones are filtered.
+ PulsarClient client = newTxnClient();
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer =
client.newProducer(Schema.string()).topic(topic).create();
+ @Cleanup
+ QueueConsumer<String> consumer = subscribe(client, topic, "pin-sub");
+
+ Transaction txn = client.newTransaction();
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().transaction(txn).value("txn-" + i).send();
+ }
+ assertNull(consumer.receive(Duration.ofSeconds(1)), "uncommitted
writes are invisible");
+
+ producer.newMessage().value("sentinel").send();
+ assertNull(consumer.receive(Duration.ofSeconds(1)),
+ "a non-transactional write after an open txn is pinned until
the txn resolves");
+
+ txn.abort();
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "after abort the non-transactional write becomes
visible");
+ assertEquals(msg.value(), "sentinel", "only the non-transactional
write is delivered");
+ consumer.acknowledge(msg.id());
+ assertNull(consumer.receive(Duration.ofMillis(500)), "aborted txn
messages must stay filtered");
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
index 6cdc197709f..59057de8bf3 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5TransactionTest.java
@@ -191,13 +191,7 @@ public class V5TransactionTest extends V5ClientBaseTest {
assertEquals(mb.value(), "b-1");
}
- // Disabled: documents a real broker gap in the per-segment
TransactionBuffer model.
- // After split, the parent segment is sealed; the txn coordinator's COMMIT
END-TXN to
- // the sealed parent never returns and the commit RPC times out (~30s).
The fix
- // requires moving transaction tracking up to the scalable-topic level (so
layout
- // changes don't strand in-flight transactions on now-sealed segments) —
that's
- // beyond the scope of this commit.
- @Test(enabled = false)
+ @Test
public void testCommitSpansSplit() throws Exception {
// A single transaction whose lifetime spans a layout-changing split
must commit
// atomically: pre-split writes (on the now-sealed parent) and
post-split writes
@@ -270,10 +264,7 @@ public class V5TransactionTest extends V5ClientBaseTest {
assertEquals(received, sent, "all txn messages across the split must
be delivered after commit");
}
- // Disabled: same broker gap as testCommitSpansSplit. After merge, both
source
- // segments are sealed and the COMMIT marker can't be delivered, so the
END-TXN
- // request times out.
- @Test(enabled = false)
+ @Test
public void testCommitSpansMerge() throws Exception {
// A single transaction whose lifetime spans a layout-changing merge
must commit
// atomically: writes to the two pre-merge segments and writes to the
post-merge
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTestBase.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTestBase.java
index 8af84ab229e..05ed5747f64 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTestBase.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTestBase.java
@@ -64,8 +64,16 @@ public abstract class TcMetadataDiscoveryTestBase extends
PulsarTestSuite {
BrokerContainer brokerContainer =
pulsarCluster.getBrokers().iterator().next();
brokerContainer.execCmd(
"/pulsar/bin/pulsar",
"initialize-transaction-coordinator-metadata",
- "-cs", ZKContainer.NAME,
+ "-cs", configurationStoreConnectionString(),
"-c", pulsarCluster.getClusterName(),
"--initial-num-transaction-coordinators",
Integer.toString(TC_PARALLELISM));
}
+
+ /**
+ * Configuration-store connection string used to initialize the
transaction-coordinator metadata.
+ * Defaults to the ZooKeeper container; an Oxia-backed subclass overrides
this with the Oxia URL.
+ */
+ protected String configurationStoreConnectionString() {
+ return ZKContainer.NAME;
+ }
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/V5ScalableTopicTransactionOnOxiaTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/V5ScalableTopicTransactionOnOxiaTest.java
new file mode 100644
index 00000000000..e28030b4a1d
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/V5ScalableTopicTransactionOnOxiaTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.transaction;
+
+import lombok.CustomLog;
+import org.apache.pulsar.tests.integration.oxia.OxiaContainer;
+import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
+
+/**
+ * Runs the full {@link V5ScalableTopicTransactionTest} suite against a
cluster whose metadata store
+ * is a containerized Oxia instead of ZooKeeper. Oxia is the backend PIP-473
targets natively — its
+ * {@code scanByIndex} / {@code subscribeSequence} / partition-key /
sequence-delta primitives back the
+ * {@code /txn} layout directly, rather than the scan-and-filter fallback
ZooKeeper uses — so this is
+ * the closest the test suite gets to the production transaction path.
+ */
+@CustomLog
+public class V5ScalableTopicTransactionOnOxiaTest extends
V5ScalableTopicTransactionTest {
+
+ @Override
+ protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
+ String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder
specBuilder) {
+ specBuilder.enableOxia(true);
+ return specBuilder;
+ }
+
+ @Override
+ protected String configurationStoreConnectionString() {
+ return "oxia://" + OxiaContainer.NAME + ":" + OxiaContainer.OXIA_PORT;
+ }
+}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/V5ScalableTopicTransactionTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/V5ScalableTopicTransactionTest.java
new file mode 100644
index 00000000000..0b5b62acd6c
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/V5ScalableTopicTransactionTest.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.transaction;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import lombok.Cleanup;
+import lombok.CustomLog;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.Producer;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.QueueConsumer;
+import org.apache.pulsar.client.api.v5.Transaction;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.policies.data.ScalableTopicMetadata;
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * End-to-end integration coverage for V5 transactions producing and consuming
data on scalable
+ * ({@code topic://}) topics across a multi-broker docker cluster: commit
visibility, abort filtering,
+ * multi-topic atomic commit, consume-transform-produce, survival of a broker
failover mid-transaction,
+ * and transactions whose lifetime spans a layout-changing split / merge.
+ */
+@CustomLog
+public class V5ScalableTopicTransactionTest extends
TcMetadataDiscoveryTestBase {
+
+ private static final int OP_TIMEOUT_SECONDS = 30;
+
+ private PulsarAdmin newAdmin() throws Exception {
+ return
PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
+ }
+
+ private PulsarClient newTxnClient() throws Exception {
+ return PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+
.transactionPolicy(TransactionPolicy.builder().timeout(Duration.ofMinutes(5)).build())
+ .build();
+ }
+
+ private String scalableTopicName() {
+ return "topic://public/default/scalable-" + randomName(8);
+ }
+
+ private QueueConsumer<String> subscribe(PulsarClient client, String topic,
String sub) throws Exception {
+ return client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(sub)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+ }
+
+ private Set<String> drain(QueueConsumer<String> consumer, int count)
throws Exception {
+ Set<String> values = new HashSet<>();
+ for (int i = 0; i < count; i++) {
+ Message<String> msg =
consumer.receive(Duration.ofSeconds(OP_TIMEOUT_SECONDS));
+ assertNotNull(msg, "missing message #" + i + " (received so far: "
+ values.size() + ")");
+ values.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+ return values;
+ }
+
+ private List<Long> activeSegments(PulsarAdmin admin, String topic) throws
Exception {
+ ScalableTopicMetadata meta = admin.scalableTopics().getMetadata(topic);
+ List<Long> ids = new ArrayList<>();
+ for (ScalableTopicMetadata.SegmentInfo seg :
meta.getSegments().values()) {
+ if (seg.isActive()) {
+ ids.add(seg.getSegmentId());
+ }
+ }
+ return ids;
+ }
+
+ @Test(timeOut = 300_000)
+ public void testProduceConsumeCommitAndAbort() throws Exception {
+ @Cleanup
+ PulsarAdmin admin = newAdmin();
+ @Cleanup
+ PulsarClient client = newTxnClient();
+ String topic = scalableTopicName();
+ admin.scalableTopics().createScalableTopic(topic, 2);
+
+ @Cleanup
+ Producer<String> producer =
client.newProducer(Schema.string()).topic(topic).create();
+ @Cleanup
+ QueueConsumer<String> consumer = subscribe(client, topic, "ce-sub");
+
+ // Commit path.
+ Transaction commit = client.newTransaction();
+ Set<String> committed = new HashSet<>();
+ for (int i = 0; i < 20; i++) {
+ String v = "commit-" + i;
+ producer.newMessage().key("k-" +
i).transaction(commit).value(v).send();
+ committed.add(v);
+ }
+ assertNull(consumer.receive(Duration.ofSeconds(2)), "nothing visible
before commit");
+ commit.commit();
+ assertEquals(drain(consumer, committed.size()), committed, "committed
messages must all be delivered");
+
+ // Abort path: aborted messages never delivered; a sentinel proves the
consumer is live.
+ Transaction abort = client.newTransaction();
+ for (int i = 0; i < 20; i++) {
+ producer.newMessage().key("k-" +
i).transaction(abort).value("abort-" + i).send();
+ }
+ abort.abort();
+ producer.newMessage().value("sentinel").send();
+ Message<String> msg =
consumer.receive(Duration.ofSeconds(OP_TIMEOUT_SECONDS));
+ assertNotNull(msg, "sentinel must be delivered");
+ assertEquals(msg.value(), "sentinel", "aborted messages must not
precede the sentinel");
+ consumer.acknowledge(msg.id());
+ assertNull(consumer.receive(Duration.ofSeconds(2)), "no aborted
messages after the sentinel");
+ }
+
+ @Test(timeOut = 300_000)
+ public void testMultiTopicAtomicCommit() throws Exception {
+ @Cleanup
+ PulsarAdmin admin = newAdmin();
+ @Cleanup
+ PulsarClient client = newTxnClient();
+ String topicA = scalableTopicName();
+ String topicB = scalableTopicName();
+ admin.scalableTopics().createScalableTopic(topicA, 2);
+ admin.scalableTopics().createScalableTopic(topicB, 2);
+
+ @Cleanup
+ Producer<String> prodA =
client.newProducer(Schema.string()).topic(topicA).create();
+ @Cleanup
+ Producer<String> prodB =
client.newProducer(Schema.string()).topic(topicB).create();
+ @Cleanup
+ QueueConsumer<String> consA = subscribe(client, topicA, "multi-a");
+ @Cleanup
+ QueueConsumer<String> consB = subscribe(client, topicB, "multi-b");
+
+ Transaction txn = client.newTransaction();
+ prodA.newMessage().value("a-1").send();
+ prodB.newMessage().value("b-1").send();
+ // The above are non-transactional sends to confirm the topics work;
now the transactional ones:
+ prodA.newMessage().transaction(txn).value("a-txn").send();
+ prodB.newMessage().transaction(txn).value("b-txn").send();
+
+ // The non-transactional a-1/b-1 are pinned behind the open
transaction's first write only if
+ // they were published after it; here they were published before, so
they are visible.
+
assertEquals(consA.receive(Duration.ofSeconds(OP_TIMEOUT_SECONDS)).value(),
"a-1");
+
assertEquals(consB.receive(Duration.ofSeconds(OP_TIMEOUT_SECONDS)).value(),
"b-1");
+ assertNull(consA.receive(Duration.ofSeconds(2)), "txn message on A
invisible pre-commit");
+ assertNull(consB.receive(Duration.ofSeconds(2)), "txn message on B
invisible pre-commit");
+
+ txn.commit();
+
assertEquals(consA.receive(Duration.ofSeconds(OP_TIMEOUT_SECONDS)).value(),
"a-txn");
+
assertEquals(consB.receive(Duration.ofSeconds(OP_TIMEOUT_SECONDS)).value(),
"b-txn");
+ }
+
+ @Test(timeOut = 300_000)
+ public void testConsumeTransformProduce() throws Exception {
+ @Cleanup
+ PulsarAdmin admin = newAdmin();
+ @Cleanup
+ PulsarClient client = newTxnClient();
+ String input = scalableTopicName();
+ String output = scalableTopicName();
+ admin.scalableTopics().createScalableTopic(input, 1);
+ admin.scalableTopics().createScalableTopic(output, 1);
+
+ @Cleanup
+ Producer<String> seed =
client.newProducer(Schema.string()).topic(input).create();
+ seed.newMessage().value("hello").send();
+
+ @Cleanup
+ QueueConsumer<String> in = subscribe(client, input, "in-sub");
+ @Cleanup
+ Producer<String> out =
client.newProducer(Schema.string()).topic(output).create();
+ @Cleanup
+ QueueConsumer<String> verify = subscribe(client, output, "verify-sub");
+
+ Message<String> seedMsg =
in.receive(Duration.ofSeconds(OP_TIMEOUT_SECONDS));
+ assertNotNull(seedMsg, "seed message must be delivered");
+
+ Transaction txn = client.newTransaction();
+
out.newMessage().transaction(txn).value(seedMsg.value().toUpperCase()).send();
+ in.acknowledge(seedMsg.id(), txn);
+ assertNull(verify.receive(Duration.ofSeconds(2)), "output invisible
before commit");
+
+ txn.commit();
+ Message<String> outMsg =
verify.receive(Duration.ofSeconds(OP_TIMEOUT_SECONDS));
+ assertNotNull(outMsg, "committed output must be delivered");
+ assertEquals(outMsg.value(), "HELLO");
+ }
+
+ @Test(timeOut = 360_000)
+ public void testTransactionSurvivesBrokerFailover() throws Exception {
+ @Cleanup
+ PulsarAdmin admin = newAdmin();
+ @Cleanup
+ PulsarClient client = newTxnClient();
+ String topic = scalableTopicName();
+ // Several segments so they spread across both brokers.
+ admin.scalableTopics().createScalableTopic(topic, 4);
+
+ @Cleanup
+ Producer<String> producer =
client.newProducer(Schema.string()).topic(topic).create();
+ @Cleanup
+ QueueConsumer<String> consumer = subscribe(client, topic,
"failover-sub");
+
+ Transaction txn = client.newTransaction();
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ String v = "before-kill-" + i;
+ producer.newMessage().key("k-" +
i).transaction(txn).value(v).send();
+ sent.add(v);
+ }
+
+ // Kill one broker mid-transaction; coordinator partitions and
segments it led are reassigned.
+ BrokerContainer victim = pulsarCluster.getBrokers().iterator().next();
+ log.info().attr("broker", victim.getContainerName()).log("Stopping
broker mid-transaction");
+ victim.stop();
+
+ // Continue producing in the same transaction; the producer reconnects
to the survivor. Retry
+ // each send within a bounded window while reassignment/reconnection
settles.
+ for (int i = 0; i < 10; i++) {
+ String v = "after-kill-" + i;
+ sendWithRetry(producer, txn, "k-after-" + i, v);
+ sent.add(v);
+ }
+
+ txn.commit();
+ assertEquals(drain(consumer, sent.size()), sent,
+ "every message in a transaction that spanned a broker failover
must be delivered");
+ }
+
+ @Test(timeOut = 360_000)
+ public void testCommitSpansSplit() throws Exception {
+ @Cleanup
+ PulsarAdmin admin = newAdmin();
+ @Cleanup
+ PulsarClient client = newTxnClient();
+ String topic = scalableTopicName();
+ admin.scalableTopics().createScalableTopic(topic, 1);
+
+ @Cleanup
+ Producer<String> producer =
client.newProducer(Schema.string()).topic(topic).create();
+ @Cleanup
+ QueueConsumer<String> consumer = subscribe(client, topic, "split-sub");
+
+ Transaction txn = client.newTransaction();
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < 5; i++) {
+ String v = "before-split-" + i;
+ producer.newMessage().key("k-" +
i).transaction(txn).value(v).send();
+ sent.add(v);
+ }
+ List<Long> active = activeSegments(admin, topic);
+ assertEquals(active.size(), 1, "expected one active segment before
split");
+ admin.scalableTopics().splitSegment(topic, active.get(0));
+ Awaitility.await().untilAsserted(() ->
assertEquals(activeSegments(admin, topic).size(), 2));
+
+ for (int i = 0; i < 5; i++) {
+ String v = "after-split-" + i;
+ sendWithRetry(producer, txn, "k-after-" + i, v);
+ sent.add(v);
+ }
+ assertNull(consumer.receive(Duration.ofSeconds(2)), "nothing visible
before commit");
+ txn.commit();
+ assertEquals(drain(consumer, sent.size()), sent, "all messages across
the split must be delivered");
+ }
+
+ @Test(timeOut = 360_000)
+ public void testCommitSpansMerge() throws Exception {
+ @Cleanup
+ PulsarAdmin admin = newAdmin();
+ @Cleanup
+ PulsarClient client = newTxnClient();
+ String topic = scalableTopicName();
+ admin.scalableTopics().createScalableTopic(topic, 2);
+
+ @Cleanup
+ Producer<String> producer =
client.newProducer(Schema.string()).topic(topic).create();
+ @Cleanup
+ QueueConsumer<String> consumer = subscribe(client, topic, "merge-sub");
+
+ Transaction txn = client.newTransaction();
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ String v = "before-merge-" + i;
+ producer.newMessage().key("k-" +
i).transaction(txn).value(v).send();
+ sent.add(v);
+ }
+ List<Long> active = activeSegments(admin, topic);
+ assertEquals(active.size(), 2, "expected two active segments before
merge");
+ admin.scalableTopics().mergeSegments(topic, active.get(0),
active.get(1));
+ Awaitility.await().untilAsserted(() ->
assertEquals(activeSegments(admin, topic).size(), 1));
+
+ for (int i = 0; i < 5; i++) {
+ String v = "after-merge-" + i;
+ sendWithRetry(producer, txn, "k-after-" + i, v);
+ sent.add(v);
+ }
+ assertNull(consumer.receive(Duration.ofSeconds(2)), "nothing visible
before commit");
+ txn.commit();
+ assertEquals(drain(consumer, sent.size()), sent, "all messages across
the merge must be delivered");
+ }
+
+ /** Send a transactional message, retrying within a bounded window while
the cluster re-settles. */
+ private void sendWithRetry(Producer<String> producer, Transaction txn,
String key, String value)
+ throws Exception {
+ long deadline = System.currentTimeMillis() + 60_000;
+ Exception last = null;
+ while (System.currentTimeMillis() < deadline) {
+ try {
+
producer.newMessage().key(key).transaction(txn).value(value).send();
+ return;
+ } catch (Exception e) {
+ last = e;
+ Thread.sleep(1000);
+ }
+ }
+ assertTrue(false, "send did not succeed within the retry window: " +
(last == null ? "?" : last));
+ }
+}
diff --git a/tests/integration/src/test/resources/pulsar-transaction.xml
b/tests/integration/src/test/resources/pulsar-transaction.xml
index 0c23b9e93ab..b69d5f31288 100644
--- a/tests/integration/src/test/resources/pulsar-transaction.xml
+++ b/tests/integration/src/test/resources/pulsar-transaction.xml
@@ -24,6 +24,8 @@
<classes>
<class
name="org.apache.pulsar.tests.integration.transaction.TransactionTest" />
<class
name="org.apache.pulsar.tests.integration.transaction.TcMetadataDiscoveryTest"
/>
+ <class
name="org.apache.pulsar.tests.integration.transaction.V5ScalableTopicTransactionTest"
/>
+ <class
name="org.apache.pulsar.tests.integration.transaction.V5ScalableTopicTransactionOnOxiaTest"
/>
</classes>
</test>
</suite>