This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c6705719338 [improve][broker] PIP-473 P5.4: v4/v5 transaction 
coordinator coexistence + enable v5 by default (#25945)
c6705719338 is described below

commit c67057193380577930bee5b016adce19f9ad4ea0
Author: Matteo Merli <[email protected]>
AuthorDate: Sun Jun 7 06:35:47 2026 -0700

    [improve][broker] PIP-473 P5.4: v4/v5 transaction coordinator coexistence + 
enable v5 by default (#25945)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  34 ++--
 .../apache/pulsar/broker/service/ServerCnx.java    |  72 +++++--
 .../buffer/impl/MetadataTransactionBuffer.java     |  23 ++-
 .../pendingack/impl/MetadataPendingAckStore.java   |  17 +-
 .../pendingack/PendingAckPersistentTest.java       |   7 +-
 .../client/impl/v5/PulsarClientBuilderV5.java      |   4 +
 .../client/impl/TransactionMetaStoreHandler.java   |  26 ++-
 .../client/impl/conf/ClientConfigurationData.java  |   7 +
 .../TransactionCoordinatorClientImpl.java          |  41 ++--
 .../apache/pulsar/common/protocol/Commands.java    |  37 +++-
 pulsar-common/src/main/proto/PulsarApi.proto       |  12 ++
 .../protocol/CommandsScalableTxnFlagTest.java      |  92 +++++++++
 .../apache/pulsar/testclient/PerfClientUtils.java  |  26 +++
 .../pulsar/testclient/PerformanceConsumer.java     |  33 ++--
 .../pulsar/testclient/PerformanceProducer.java     |   3 +-
 .../pulsar/testclient/PerformanceTransaction.java  |  31 ++-
 .../Oauth2PerformanceTransactionTest.java          |  84 ++++----
 .../testclient/PerformanceTransactionTest.java     | 209 ++++++++++----------
 tests/integration/build.gradle.kts                 |   2 +
 .../transaction/TcMetadataDiscoveryTest.java       | 213 +++++++++++----------
 20 files changed, 631 insertions(+), 342 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index f10b2cbdc32..7771b43059f 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3798,13 +3798,14 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     @FieldContext(
             category = CATEGORY_TRANSACTION,
             doc = "Enable the metadata-driven transaction coordinator used by 
scalable topics."
-                    + " When true, wire commands (NEW_TXN / END_TXN / etc.) 
are served by the"
-                    + " metadata-store-backed coordinator instead of the 
legacy"
-                    + " TransactionMetadataStoreService. Requires 
transactionCoordinatorEnabled"
-                    + " = true, and must be enabled together with the 
scalable-topic transaction"
-                    + " buffer and pending-ack store providers."
+                    + " When true, transaction wire commands flagged as 
scalable (sent by v5 SDK"
+                    + " clients) are served by the metadata-store-backed 
coordinator, while legacy"
+                    + " (v4) clients continue to be served by 
TransactionMetadataStoreService — the"
+                    + " two coexist on the same cluster. Requires 
transactionCoordinatorEnabled"
+                    + " = true. Enabled by default together with the 
dispatching transaction buffer"
+                    + " and pending-ack store providers."
     )
-    private boolean transactionCoordinatorScalableTopicsEnabled = false;
+    private boolean transactionCoordinatorScalableTopicsEnabled = true;
 
     @FieldContext(
             category = CATEGORY_TRANSACTION,
@@ -3859,21 +3860,26 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
 
     @FieldContext(
             category = CATEGORY_TRANSACTION,
-            doc = "Class name for transaction buffer provider. Default routes 
segment:// topics to the"
-                    + " legacy TopicTransactionBuffer. Set this to"
-                    + " 
org.apache.pulsar.broker.transaction.buffer.impl.DispatchingTransactionBufferProvider"
-                    + " once the v5 transaction coordinator (PIP-473 P5) is 
enabled to opt segment topics"
-                    + " into MetadataTransactionBuffer."
+            doc = "Class name for transaction buffer provider. The default 
DispatchingTransactionBufferProvider"
+                    + " routes segment:// topics to the metadata-driven 
MetadataTransactionBuffer (PIP-473)"
+                    + " and persistent:// / topic:// topics to the legacy 
TopicTransactionBuffer. Set this to"
+                    + " 
org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider 
to"
+                    + " force the legacy buffer for all topics."
     )
     private String transactionBufferProviderClassName =
-            
"org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider";
+            
"org.apache.pulsar.broker.transaction.buffer.impl.DispatchingTransactionBufferProvider";
 
     @FieldContext(
             category = CATEGORY_TRANSACTION,
-            doc = "Class name for transaction pending ack store provider"
+            doc = "Class name for transaction pending ack store provider. The 
default"
+                    + " DispatchingTransactionPendingAckStoreProvider routes 
subscriptions on segment:// topics"
+                    + " to the metadata-driven MetadataPendingAckStore 
(PIP-473) and others to the legacy"
+                    + " MLPendingAckStore. Set this to"
+                    + " 
org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider 
to force"
+                    + " the legacy store for all subscriptions."
     )
     private String transactionPendingAckStoreProviderClassName =
-            
"org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider";
+            
"org.apache.pulsar.broker.transaction.pendingack.impl.DispatchingTransactionPendingAckStoreProvider";
 
     @FieldContext(
             category = CATEGORY_TRANSACTION,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 00253bbb431..8c2dfe8d781 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -3446,7 +3446,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return;
         }
 
-        if 
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
 {
+        if (command.isScalable()) {
+            if (!isScalableTcAvailable()) {
+                commandSender.sendTcClientConnectResponse(requestId, 
ServerError.NotAllowedError,
+                        "Scalable-topics transaction coordinator is not 
enabled on this broker");
+                return;
+            }
             
service.pulsar().getTransactionCoordinatorV5().handleClientConnect(tcId)
                     .whenComplete((__, e) -> {
                         if (e == null) {
@@ -3492,6 +3497,16 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return true;
         }
     }
+    /**
+     * @return true if the scalable-topics (PIP-473) transaction coordinator 
is enabled and ready on
+     *     this broker. Transaction commands carrying {@code scalable=true} 
route to it; commands
+     *     without the flag always go to the legacy coordinator, so v4 and v5 
clients coexist.
+     */
+    private boolean isScalableTcAvailable() {
+        return 
service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled()
+                && service.getPulsar().getTransactionCoordinatorV5() != null;
+    }
+
     private Throwable handleTxnException(Throwable ex, String op, long 
requestId) {
         Throwable cause = FutureUtil.unwrapCompletionException(ex);
         if (cause instanceof 
CoordinatorException.CoordinatorNotFoundException) {
@@ -3527,7 +3542,12 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return;
         }
 
-        if 
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
 {
+        if (command.isScalable()) {
+            if (!isScalableTcAvailable()) {
+                commandSender.sendNewTxnErrorResponse(requestId, tcId.getId(), 
ServerError.NotAllowedError,
+                        "Scalable-topics transaction coordinator is not 
enabled on this broker");
+                return;
+            }
             final String v5Owner = getPrincipal();
             service.pulsar().getTransactionCoordinatorV5()
                     .newTransaction(tcId, command.getTxnTtlSeconds() * 1000L, 
v5Owner)
@@ -3594,11 +3614,17 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return;
         }
 
-        if 
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
 {
+        if (command.isScalable()) {
+            if (!isScalableTcAvailable()) {
+                writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, 
txnID.getLeastSigBits(),
+                        txnID.getMostSigBits(), ServerError.NotAllowedError,
+                        "Scalable-topics transaction coordinator is not 
enabled on this broker"));
+                return;
+            }
             // v5: TC doesn't need pre-registration — participants advertise 
themselves by writing
             // /txn/op records when they actually apply ops. Still verify 
ownership before acking,
             // matching the legacy authorization surface.
-            verifyTxnOwnership(txnID)
+            verifyTxnOwnership(txnID, true)
                     .thenCompose(isOwner -> isOwner ? 
CompletableFuture.<Void>completedFuture(null)
                             : failedFutureTxnNotOwned(txnID))
                     .whenComplete((v, ex) -> {
@@ -3618,7 +3644,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
         TransactionMetadataStoreService transactionMetadataStoreService =
                 service.pulsar().getTransactionMetadataStoreService();
-        verifyTxnOwnership(txnID)
+        verifyTxnOwnership(txnID, false)
                 .thenCompose(isOwner -> {
                     if (!isOwner) {
                         return failedFutureTxnNotOwned(txnID);
@@ -3676,8 +3702,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return;
         }
 
-        if 
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
 {
-            verifyTxnOwnership(txnID)
+        if (command.isScalable()) {
+            if (!isScalableTcAvailable()) {
+                commandSender.sendEndTxnErrorResponse(requestId, txnID, 
ServerError.NotAllowedError,
+                        "Scalable-topics transaction coordinator is not 
enabled on this broker");
+                return;
+            }
+            verifyTxnOwnership(txnID, true)
                     .thenCompose(isOwner -> {
                         if (!isOwner) {
                             return failedFutureTxnNotOwned(txnID);
@@ -3700,7 +3731,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         TransactionMetadataStoreService transactionMetadataStoreService =
                 service.pulsar().getTransactionMetadataStoreService();
 
-        verifyTxnOwnership(txnID)
+        verifyTxnOwnership(txnID, false)
                 .thenCompose(isOwner -> {
                     if (!isOwner) {
                         return failedFutureTxnNotOwned(txnID);
@@ -3739,14 +3770,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         }
     }
 
-    private CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID) {
+    private CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID, boolean 
scalable) {
         assert ctx.executor().inEventLoop();
-        CompletableFuture<Boolean> ownerCheck =
-                
service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled()
-                        ? service.pulsar().getTransactionCoordinatorV5()
-                                .verifyTxnOwnership(txnID, getPrincipal())
-                        : service.pulsar().getTransactionMetadataStoreService()
-                                .verifyTxnOwnership(txnID, getPrincipal());
+        CompletableFuture<Boolean> ownerCheck = scalable
+                ? service.pulsar().getTransactionCoordinatorV5()
+                        .verifyTxnOwnership(txnID, getPrincipal())
+                : service.pulsar().getTransactionMetadataStoreService()
+                        .verifyTxnOwnership(txnID, getPrincipal());
         return ownerCheck
                 .thenComposeAsync(isOwner -> {
                     if (isOwner) {
@@ -4016,11 +4046,17 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             return;
         }
 
-        if 
(service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
 {
+        if (command.isScalable()) {
+            if (!isScalableTcAvailable()) {
+                
writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId, 
txnID.getLeastSigBits(),
+                        txnID.getMostSigBits(), ServerError.NotAllowedError,
+                        "Scalable-topics transaction coordinator is not 
enabled on this broker"));
+                return;
+            }
             // v5: TC doesn't need pre-registration — participants advertise 
themselves by writing
             // /txn/op records when they actually apply ops. Still verify 
ownership before acking,
             // matching the legacy authorization surface.
-            verifyTxnOwnership(txnID)
+            verifyTxnOwnership(txnID, true)
                     .thenCompose(isOwner -> isOwner ? 
CompletableFuture.<Void>completedFuture(null)
                             : failedFutureTxnNotOwned(txnID))
                     .whenComplete((v, ex) -> {
@@ -4041,7 +4077,7 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         TransactionMetadataStoreService transactionMetadataStoreService =
                 service.pulsar().getTransactionMetadataStoreService();
 
-        verifyTxnOwnership(txnID)
+        verifyTxnOwnership(txnID, false)
                 .thenCompose(isOwner -> {
                     if (!isOwner) {
                         return failedFutureTxnNotOwned(txnID);
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
index 845c1c0328a..4ac6b89ae51 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/MetadataTransactionBuffer.java
@@ -119,9 +119,6 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
     /** Version of the durable watermark record; -1 if it doesn't exist yet. */
     private long watermarkVersion = -1L;
 
-    /** Latest dispatched position from non-txn publishes — the natural 
ceiling when no opens pin. */
-    private Position lastDispatchable;
-
     /** Current maxReadPosition; never moves above the watermark while 
recovery-discovered opens exist. */
     private Position maxReadPosition;
 
@@ -141,7 +138,6 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
         this.segmentName = topic.getName();
         this.maxReadPositionCallBack = topic.getMaxReadPositionCallBack();
         this.maxReadPosition = ledger.getLastConfirmedEntry();
-        this.lastDispatchable = this.maxReadPosition;
         recover();
     }
 
@@ -544,18 +540,28 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
             next = watermarkPos != null ? watermarkPos : maxReadPosition;
         } else {
             Position min = null;
+            boolean anyOpen = false;
             for (TxnEntry e : txns.values()) {
-                if (e.state == TxnState.OPEN && e.firstPosition != null) {
-                    if (min == null || e.firstPosition.compareTo(min) < 0) {
+                if (e.state == TxnState.OPEN) {
+                    anyOpen = true;
+                    if (e.firstPosition != null
+                            && (min == null || e.firstPosition.compareTo(min) 
< 0)) {
                         min = e.firstPosition;
                     }
                 }
             }
             if (min != null) {
+                // Pin just below the lowest open txn's first write.
                 next = ledger.getPreviousPosition(min);
+            } else if (anyOpen) {
+                // Open txn(s) whose first write isn't tracked yet (an append 
is in flight between
+                // the /txn/op record and the ledger entry): hold the current 
ceiling rather than
+                // risk exposing the in-flight entry.
+                next = maxReadPosition;
             } else {
-                // No open txns pinning anything: free to advance to 
last-dispatched.
-                next = lastDispatchable;
+                // No open txns: every written entry is resolved — committed 
data is visible and
+                // aborted data is filtered by isTxnAborted — so advance to 
the last written entry.
+                next = ledger.getLastConfirmedEntry();
             }
         }
         Position prev = maxReadPosition;
@@ -593,7 +599,6 @@ public class MetadataTransactionBuffer implements 
TransactionBuffer {
         }
         topic.updateLastDispatchablePosition(position);
         synchronized (lock) {
-            lastDispatchable = position;
             recomputeMaxReadPositionLocked();
             // Persist the new watermark if it advanced as a result of the 
non-txn append.
             stateTail = stateTail.thenCompose(__ -> 
persistWatermarkIfAdvanced())
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
index fba4ec44361..439b75ecab2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MetadataPendingAckStore.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ExecutorService;
 import lombok.CustomLog;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.transaction.metadata.TxnIds;
 import org.apache.pulsar.broker.transaction.metadata.TxnMetadataStore;
@@ -165,7 +166,21 @@ public class MetadataPendingAckStore implements 
PendingAckStore {
                 return;
             }
             recoveryFuture.complete(null);
-            pendingAckHandle.changeToReadyState();
+            // Mirror the legacy MLPendingAckStore completion: flip the handle 
to Ready and
+            // complete the handle future — PersistentSubscription.addConsumer 
blocks on that
+            // future, so skipping it hangs every subscribe on a segment topic 
— then drain any
+            // ack requests queued during recovery. Run on the pinned executor 
so the
+            // state-machine transition and the cache drain stay 
single-threaded.
+            executorService.execute(() -> {
+                if (pendingAckHandle.changeToReadyState()) {
+                    pendingAckHandle.completeHandleFuture();
+                } else {
+                    pendingAckHandle.exceptionHandleFuture(
+                            new 
BrokerServiceException.ServiceUnitNotReadyException(
+                                    "Failed to change PendingAckHandle state 
to Ready"));
+                }
+                pendingAckHandle.handleCacheRequest();
+            });
             // Drain any events that fired during recovery.
             triggerReconcile();
         });
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
index ffd3a40a814..5c66c6d0e7c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckPersistentTest.java
@@ -166,8 +166,11 @@ public class PendingAckPersistentTest extends 
TransactionTestBase {
         when(mockProvider.newPendingAckStore(any()))
                 // First, the method newPendingAckStore will fail with a 
retryable exception.
                 .thenReturn(FutureUtil.failedFuture(new 
ManagedLedgerException("mock fail new store")))
-                // Then, the method will be executed successfully.
-                .thenCallRealMethod();
+                // Then, the method will be executed successfully. Delegate to 
the real provider
+                // rather than thenCallRealMethod(): the configured provider 
is now the dispatching
+                // provider, and a Mockito mock of it has null delegate 
fields, so calling its real
+                // method would NPE. The original real provider behaves 
identically for this topic.
+                .thenAnswer(invocation -> 
pendingAckStoreProvider.newPendingAckStore(invocation.getArgument(0)));
         transactionPendingAckStoreProviderField.set(pulsarServiceList.get(0), 
mockProvider);
         Consumer<byte[]> consumer3 = pulsarClient.newConsumer()
                 .subscriptionName("subName3")
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
index 8104c4bbc36..05dba76c00d 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientBuilderV5.java
@@ -43,6 +43,10 @@ final class PulsarClientBuilderV5 implements 
PulsarClientBuilder {
 
     PulsarClientBuilderV5() {
         conf.setStatsIntervalSeconds(0);
+        // v5 SDK transactions use the metadata-store (PIP-473) coordinator. 
This internal flag
+        // routes the underlying v4 TC client to it, keeping v5 transactions 
independent from any
+        // v4 SDK clients (which use the legacy coordinator) on the same 
cluster.
+        conf.setScalableTransactions(true);
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index ae52cfd0550..64dd7ef4344 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -96,6 +96,7 @@ public class TransactionMetaStoreHandler extends HandlerState
     // whether it must be reached through the proxy. Null leaderUri means 
assign-topic mode.
     private volatile URI leaderUri;
     private volatile boolean useProxy;
+    private final boolean scalable;
 
 
 
@@ -123,6 +124,10 @@ public class TransactionMetaStoreHandler extends 
HandlerState
         super(pulsarClient, topic);
         this.leaderUri = leaderUri;
         this.useProxy = useProxy;
+        // A handler built with a fixed leader URI is a v5 (metadata-store 
discovery) handler; one
+        // built with a topic name is a legacy v4 handler. The flag routes 
each command to the
+        // matching coordinator on the broker so v4 and v5 clients coexist.
+        this.scalable = leaderUri != null;
         this.transactionCoordinatorId = transactionCoordinatorId;
         this.timeoutQueue = new ConcurrentLinkedQueue<>();
         this.blockIfReachMaxPendingOps = true;
@@ -199,7 +204,8 @@ public class TransactionMetaStoreHandler extends 
HandlerState
             // if broker protocol version < 19, don't send 
TcClientConnectRequest to broker.
             if (cnx.getRemoteEndpointProtocolVersion() > 
ProtocolVersion.v18.getValue()) {
                 long requestId = client.newRequestId();
-                ByteBuf request = 
Commands.newTcClientConnectRequest(transactionCoordinatorId, requestId);
+                ByteBuf request =
+                        
Commands.newTcClientConnectRequest(transactionCoordinatorId, requestId, 
scalable);
 
                 cnx.sendRequestWithId(request, requestId).thenRun(() -> {
                     internalPinnedExecutor.execute(() -> {
@@ -274,7 +280,7 @@ public class TransactionMetaStoreHandler extends 
HandlerState
             return callback;
         }
         long requestId = client.newRequestId();
-        ByteBuf cmd = Commands.newTxn(transactionCoordinatorId, requestId, 
unit.toMillis(timeout));
+        ByteBuf cmd = Commands.newTxn(transactionCoordinatorId, requestId, 
unit.toMillis(timeout), scalable);
         String description = String.format("Create new transaction %s", 
transactionCoordinatorId);
         OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback, 
client, description, cnx());
         internalPinnedExecutor.execute(() -> {
@@ -352,7 +358,7 @@ public class TransactionMetaStoreHandler extends 
HandlerState
         }
         long requestId = client.newRequestId();
         ByteBuf cmd = Commands.newAddPartitionToTxn(
-                requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), 
partitions);
+                requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), 
partitions, scalable);
         String description = String.format("Add partition %s to TXN %s", 
String.valueOf(partitions),
                 String.valueOf(txnID));
         OpForVoidCallBack op = OpForVoidCallBack
@@ -435,7 +441,7 @@ public class TransactionMetaStoreHandler extends 
HandlerState
         }
         long requestId = client.newRequestId();
         ByteBuf cmd = Commands.newAddSubscriptionToTxn(
-                requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), 
subscriptionList);
+                requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), 
subscriptionList, scalable);
         String description = String.format("Add subscription %s to TXN %s", 
toStringSubscriptionList(subscriptionList),
                 String.valueOf(txnID));
         OpForVoidCallBack op = OpForVoidCallBack.create(cmd, callback, client, 
description, cnx());
@@ -526,7 +532,8 @@ public class TransactionMetaStoreHandler extends 
HandlerState
             return callback;
         }
         long requestId = client.newRequestId();
-        BaseCommand cmd = Commands.newEndTxn(requestId, 
txnID.getLeastSigBits(), txnID.getMostSigBits(), action);
+        BaseCommand cmd = Commands.newEndTxn(requestId, 
txnID.getLeastSigBits(), txnID.getMostSigBits(),
+                action, scalable);
         ByteBuf buf = Commands.serializeWithSize(cmd);
         String description = String.format("End [%s] TXN %s", 
String.valueOf(action), String.valueOf(txnID));
         OpForVoidCallBack op = OpForVoidCallBack.create(buf, callback, client, 
description, cnx());
@@ -756,6 +763,14 @@ public class TransactionMetaStoreHandler extends 
HandlerState
                 }
                 return true;
             case Connecting:
+            case Uninitialized:
+                // Not connected yet, but the handler is (or will be) 
establishing the connection. For
+                // the metadata-store coordinator the partition leader is 
still being resolved via the
+                // assignment watch, so the handler can sit in Uninitialized 
briefly after the client is
+                // built. Leave the op queued in pendingRequests; it is 
retried from connectionOpened
+                // once the handler is Ready, and the operation-timeout sweep 
fails it if the connection
+                // never comes. Failing fast here would make a freshly-built 
client's first request
+                // race the asynchronous connect.
                 return true;
             case Closing:
             case Closed:
@@ -767,7 +782,6 @@ public class TransactionMetaStoreHandler extends 
HandlerState
                 onResponse(op);
                 return false;
             case Failed:
-            case Uninitialized:
                 op.callback.completeExceptionally(
                         new 
TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException(
                                 "Transaction meta store handler for tcId "
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
index 1c282198eb0..e1c747d18aa 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -85,6 +85,13 @@ public class ClientConfigurationData implements 
Serializable, Cloneable {
     )
     private long serviceUrlQuarantineMaxDurationMs = TimeUnit.DAYS.toMillis(1);
 
+    // Internal: set by the v5 SDK (PulsarClientBuilderV5), not exposed on the 
public ClientBuilder.
+    // When true, transactions use the metadata-driven (PIP-473) coordinator; 
when false (v4 SDK),
+    // they use the legacy coordinator. Routes coexistence at the TC layer by 
client/SDK kind rather
+    // than broker capability, so a v4 client keeps using the legacy TC even 
on a v5-enabled cluster.
+    @JsonIgnore
+    private boolean scalableTransactions = false;
+
     @Schema(
             name = "authentication",
             description = "Authentication settings of the client."
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index 6fe938d2d62..13e8a5ca2ec 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import lombok.CustomLog;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
 import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
 import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.CoordinatorClientStateException;
@@ -86,35 +87,25 @@ public class TransactionCoordinatorClientImpl implements 
TransactionCoordinatorC
     }
 
     /**
-     * Choose the discovery strategy. The metadata-store assignment watch 
needs a binary-protocol
-     * connection, so it's only usable when the client is configured with a 
{@code pulsar://}
-     * service URL; with an {@code http://} service URL we always use the 
assign-topic flow (which
-     * resolves coordinators via the admin/HTTP-capable partitioned-metadata 
lookup). When binary
-     * lookup is available, probe the broker's {@code 
supports_tc_metadata_discovery} feature flag to
-     * decide; if the broker doesn't advertise it (old broker, or 
scalable-topics TC disabled), fall
-     * back to the assign-topic flow.
+     * Choose the discovery strategy by client/SDK kind, not by broker 
capability. A v5 SDK client
+     * sets the internal {@code scalableTransactions} config flag and uses the 
metadata-store
+     * coordinator (assignment watch); a v4 SDK client leaves it unset and 
uses the legacy
+     * assign-topic coordinator. This keeps v4 and v5 transactions independent 
on the same cluster:
+     * flipping the broker default to enable the v5 TC must not silently 
re-route v4 clients to it,
+     * since the v5 TC notifies participants via metadata-store events that 
the legacy transaction
+     * buffer / pending-ack store don't consume.
      */
     private CompletableFuture<TcDiscovery> selectDiscovery() {
-        if (!pulsarClient.getLookup().isBinaryProtoLookupService()) {
+        if (!pulsarClient.getConfiguration().isScalableTransactions()) {
             return CompletableFuture.completedFuture(new 
AssignTopicTcDiscovery(pulsarClient));
         }
-        // Probe a broker connection to read the feature flag. Use 
getAnyBrokerProxyConnection() (not
-        // getConnectionToServiceUrl()): when connecting through a proxy, the 
latter yields the proxy's
-        // own CONNECTED, which carries the proxy lookup handshake's flags 
rather than a broker's;
-        // getAnyBrokerProxyConnection() pairs to an actual broker (directly 
or proxied) so the
-        // forwarded feature flags reflect the broker — the same connection 
the watch itself uses.
-        // If the probe fails, fall back to the assign-topic flow, whose 
lookup retries across hosts
-        // and still works against v5 brokers (the assign topic exists during 
the deprecation window),
-        // so falling back is always safe.
-        return pulsarClient.getAnyBrokerProxyConnection()
-                .thenApply(cnx -> cnx.isSupportsTcMetadataDiscovery()
-                        ? (TcDiscovery) new 
WatchTcAssignmentsDiscovery(pulsarClient)
-                        : new AssignTopicTcDiscovery(pulsarClient))
-                .exceptionally(ex -> {
-                    log.info().exception(ex)
-                            .log("TC discovery feature probe failed; using 
assign-topic discovery");
-                    return new AssignTopicTcDiscovery(pulsarClient);
-                });
+        // The metadata-store assignment watch needs a binary connection. A v5 
client on an
+        // http:// service URL is a misconfiguration — fail clearly rather 
than silently downgrade.
+        if (!pulsarClient.getLookup().isBinaryProtoLookupService()) {
+            return FutureUtil.failedFuture(new 
PulsarClientException.InvalidServiceURL(
+                    "Scalable-topics transactions require a pulsar:// service 
URL", null));
+        }
+        return CompletableFuture.completedFuture(new 
WatchTcAssignmentsDiscovery(pulsarClient));
     }
 
     @Override
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 3b8307dcfcb..80b10f47890 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -724,8 +724,12 @@ public class Commands {
     }
 
     public static ByteBuf newTcClientConnectRequest(long tcId, long requestId) 
{
+        return newTcClientConnectRequest(tcId, requestId, false);
+    }
+
+    public static ByteBuf newTcClientConnectRequest(long tcId, long requestId, 
boolean scalable) {
         BaseCommand cmd = localCmd(Type.TC_CLIENT_CONNECT_REQUEST);
-        cmd.setTcClientConnectRequest().setTcId(tcId).setRequestId(requestId);
+        
cmd.setTcClientConnectRequest().setTcId(tcId).setRequestId(requestId).setScalable(scalable);
         return serializeWithSize(cmd);
     }
 
@@ -1432,11 +1436,16 @@ public class Commands {
     // ---- transaction related ----
 
     public static ByteBuf newTxn(long tcId, long requestId, long ttlSeconds) {
+        return newTxn(tcId, requestId, ttlSeconds, false);
+    }
+
+    public static ByteBuf newTxn(long tcId, long requestId, long ttlSeconds, 
boolean scalable) {
         BaseCommand cmd = localCmd(Type.NEW_TXN);
         cmd.setNewTxn()
                 .setTcId(tcId)
                 .setRequestId(requestId)
-                .setTxnTtlSeconds(ttlSeconds);
+                .setTxnTtlSeconds(ttlSeconds)
+                .setScalable(scalable);
         return serializeWithSize(cmd);
     }
 
@@ -1463,11 +1472,17 @@ public class Commands {
 
     public static ByteBuf newAddPartitionToTxn(long requestId, long 
txnIdLeastBits, long txnIdMostBits,
                                                List<String> partitions) {
+        return newAddPartitionToTxn(requestId, txnIdLeastBits, txnIdMostBits, 
partitions, false);
+    }
+
+    public static ByteBuf newAddPartitionToTxn(long requestId, long 
txnIdLeastBits, long txnIdMostBits,
+                                               List<String> partitions, 
boolean scalable) {
         BaseCommand cmd = localCmd(Type.ADD_PARTITION_TO_TXN);
         CommandAddPartitionToTxn req = cmd.setAddPartitionToTxn()
                 .setRequestId(requestId)
                 .setTxnidLeastBits(txnIdLeastBits)
-                .setTxnidMostBits(txnIdMostBits);
+                .setTxnidMostBits(txnIdMostBits)
+                .setScalable(scalable);
         if (partitions != null) {
             partitions.forEach(req::addPartition);
         }
@@ -1503,11 +1518,17 @@ public class Commands {
 
     public static ByteBuf newAddSubscriptionToTxn(long requestId, long 
txnIdLeastBits, long txnIdMostBits,
             List<Subscription> subscriptions) {
+        return newAddSubscriptionToTxn(requestId, txnIdLeastBits, 
txnIdMostBits, subscriptions, false);
+    }
+
+    public static ByteBuf newAddSubscriptionToTxn(long requestId, long 
txnIdLeastBits, long txnIdMostBits,
+            List<Subscription> subscriptions, boolean scalable) {
         BaseCommand cmd = localCmd(Type.ADD_SUBSCRIPTION_TO_TXN);
         CommandAddSubscriptionToTxn add = cmd.setAddSubscriptionToTxn()
                 .setRequestId(requestId)
                 .setTxnidLeastBits(txnIdLeastBits)
-                .setTxnidMostBits(txnIdMostBits);
+                .setTxnidMostBits(txnIdMostBits)
+                .setScalable(scalable);
         subscriptions.forEach(s -> add.addSubscription().copyFrom(s));
         return serializeWithSize(cmd);
     }
@@ -1536,11 +1557,17 @@ public class Commands {
     }
 
     public static BaseCommand newEndTxn(long requestId, long txnIdLeastBits, 
long txnIdMostBits, TxnAction txnAction) {
+        return newEndTxn(requestId, txnIdLeastBits, txnIdMostBits, txnAction, 
false);
+    }
+
+    public static BaseCommand newEndTxn(long requestId, long txnIdLeastBits, 
long txnIdMostBits, TxnAction txnAction,
+                                        boolean scalable) {
         BaseCommand cmd = localCmd(Type.END_TXN);
         cmd.setEndTxn()
                 .setRequestId(requestId)
                 
.setTxnidLeastBits(txnIdLeastBits).setTxnidMostBits(txnIdMostBits)
-                .setTxnAction(txnAction);
+                .setTxnAction(txnAction)
+                .setScalable(scalable);
         return cmd;
     }
 
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 76cd8d382ab..f769512660c 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -1097,6 +1097,8 @@ enum TxnAction {
 message CommandTcClientConnectRequest {
     required uint64 request_id = 1;
     required uint64 tc_id = 2 [default = 0];
+    // Route to the scalable-topics (PIP-473) coordinator. See 
CommandNewTxn.scalable.
+    optional bool scalable = 3 [default = false];
 }
 
 message CommandTcClientConnectResponse {
@@ -1109,6 +1111,10 @@ message CommandNewTxn {
     required uint64 request_id = 1;
     optional uint64 txn_ttl_seconds = 2 [default = 0];
     optional uint64 tc_id = 3 [default = 0];
+    // When true, route to the metadata-driven (scalable-topics, PIP-473) 
transaction coordinator
+    // instead of the legacy one. Set by v5 clients; absent for v4 clients. 
Lets both coordinators
+    // serve their own clients on the same cluster.
+    optional bool scalable = 4 [default = false];
 }
 
 message CommandNewTxnResponse {
@@ -1124,6 +1130,8 @@ message CommandAddPartitionToTxn {
     optional uint64 txnid_least_bits = 2 [default = 0];
     optional uint64 txnid_most_bits = 3 [default = 0];
     repeated string partitions = 4;
+    // Route to the scalable-topics (PIP-473) coordinator. See 
CommandNewTxn.scalable.
+    optional bool scalable = 5 [default = false];
 }
 
 message CommandAddPartitionToTxnResponse {
@@ -1143,6 +1151,8 @@ message CommandAddSubscriptionToTxn {
     optional uint64 txnid_least_bits = 2 [default = 0];
     optional uint64 txnid_most_bits = 3 [default = 0];
     repeated Subscription subscription = 4;
+    // Route to the scalable-topics (PIP-473) coordinator. See 
CommandNewTxn.scalable.
+    optional bool scalable = 5 [default = false];
 }
 
 message CommandAddSubscriptionToTxnResponse {
@@ -1158,6 +1168,8 @@ message CommandEndTxn {
     optional uint64 txnid_least_bits = 2 [default = 0];
     optional uint64 txnid_most_bits = 3 [default = 0];
     optional TxnAction txn_action = 4;
+    // Route to the scalable-topics (PIP-473) coordinator. See 
CommandNewTxn.scalable.
+    optional bool scalable = 5 [default = false];
 }
 
 message CommandEndTxnResponse {
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTxnFlagTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTxnFlagTest.java
new file mode 100644
index 00000000000..fa650992c13
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsScalableTxnFlagTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+import java.util.List;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.TxnAction;
+import org.testng.annotations.Test;
+
+/**
+ * Roundtrip tests for the {@code scalable} routing flag on the transaction 
commands (PIP-473
+ * coexistence): a v5 client sets it so the broker routes the command to the 
metadata-store
+ * coordinator; a v4 client omits it and routes to the legacy coordinator. 
Each command is encoded,
+ * the serialized wire frame is reparsed, and the flag is checked in both 
states.
+ */
+public class CommandsScalableTxnFlagTest {
+
+    private static BaseCommand parseFrame(ByteBuf frame) {
+        try {
+            frame.skipBytes(4); // total size
+            int cmdSize = (int) frame.readUnsignedInt();
+            BaseCommand cmd = new BaseCommand();
+            cmd.parseFrom(frame, cmdSize);
+            cmd.materialize();
+            return cmd;
+        } finally {
+            frame.release();
+        }
+    }
+
+    @Test
+    public void tcClientConnectCarriesScalable() {
+        assertTrue(parseFrame(Commands.newTcClientConnectRequest(1L, 2L, true))
+                .getTcClientConnectRequest().isScalable());
+        assertFalse(parseFrame(Commands.newTcClientConnectRequest(1L, 2L))
+                .getTcClientConnectRequest().isScalable());
+    }
+
+    @Test
+    public void newTxnCarriesScalable() {
+        assertTrue(parseFrame(Commands.newTxn(0L, 1L, 60_000L, 
true)).getNewTxn().isScalable());
+        assertFalse(parseFrame(Commands.newTxn(0L, 1L, 
60_000L)).getNewTxn().isScalable());
+    }
+
+    @Test
+    public void endTxnCarriesScalable() {
+        assertTrue(Commands.newEndTxn(1L, 2L, 0L, TxnAction.COMMIT, 
true).getEndTxn().isScalable());
+        assertFalse(Commands.newEndTxn(1L, 2L, 0L, 
TxnAction.COMMIT).getEndTxn().isScalable());
+    }
+
+    @Test
+    public void addPartitionCarriesScalable() {
+        assertTrue(parseFrame(Commands.newAddPartitionToTxn(1L, 2L, 0L, 
List.of("t"), true))
+                .getAddPartitionToTxn().isScalable());
+        assertFalse(parseFrame(Commands.newAddPartitionToTxn(1L, 2L, 0L, 
List.of("t")))
+                .getAddPartitionToTxn().isScalable());
+    }
+
+    @Test
+    public void addSubscriptionCarriesScalable() {
+        assertTrue(parseFrame(Commands.newAddSubscriptionToTxn(1L, 2L, 0L, 
List.of(), true))
+                .getAddSubscriptionToTxn().isScalable());
+        assertFalse(parseFrame(Commands.newAddSubscriptionToTxn(1L, 2L, 0L, 
List.of()))
+                .getAddSubscriptionToTxn().isScalable());
+    }
+
+    @Test
+    public void defaultIsFalse() {
+        // A command with no scalable field set (legacy/v4 client) must read 
false.
+        assertEquals(parseFrame(Commands.newTxn(0L, 1L, 
60_000L)).getNewTxn().isScalable(), false);
+    }
+}
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
index d8528c3e593..633b7c895b4 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerfClientUtils.java
@@ -291,6 +291,32 @@ public class PerfClientUtils {
         }
     }
 
+    /**
+     * Open a transaction on the V5 client, retrying briefly while the 
transaction-coordinator handler
+     * finishes its asynchronous connect. The first {@code newTransaction()} 
right after the client is
+     * built can race that connect and fail with {@code 
MetaStoreHandlerNotReadyException}; the perf
+     * tools open their initial transaction before building 
producers/consumers, so they hit this
+     * window (whereas a tool that builds participants first gives the handler 
time to connect).
+     *
+     * @param client the V5 client to open the transaction on
+     * @return a new transaction once the coordinator is ready
+     */
+    public static org.apache.pulsar.client.api.v5.Transaction 
newTransactionWithRetry(
+            org.apache.pulsar.client.api.v5.PulsarClient client)
+            throws org.apache.pulsar.client.api.v5.PulsarClientException, 
InterruptedException {
+        long deadline = System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(30);
+        while (true) {
+            try {
+                return client.newTransaction();
+            } catch (org.apache.pulsar.client.api.v5.PulsarClientException e) {
+                if (System.currentTimeMillis() > deadline || 
hasInterruptedException(e)) {
+                    throw e;
+                }
+                Thread.sleep(200);
+            }
+        }
+    }
+
     /**
      * Check if the throwable or any of its causes is an InterruptedException.
      *
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index cc4f2106d4c..6c0f101edd8 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -288,7 +288,7 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
 
         AtomicReference<Transaction> atomicReference;
         if (this.isEnableTransaction) {
-            atomicReference = new 
AtomicReference<>(pulsarClient.newTransaction());
+            atomicReference = new 
AtomicReference<>(PerfClientUtils.newTransactionWithRetry(pulsarClient));
         } else {
             atomicReference = new AtomicReference<>(null);
         }
@@ -475,6 +475,24 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
                           Thread mainThread,
                           PulsarClient pulsarClient) {
         while (!Thread.currentThread().isInterrupted()) {
+            // Termination conditions that don't depend on having just 
received a message. With
+            // asynchronous transaction commits the final commit can land 
after the last available
+            // message is consumed, so the transaction count must be 
re-checked on idle receives too;
+            // otherwise the consumer waits forever for a message that will 
never arrive.
+            if (this.testTime > 0 && System.nanoTime() > testEndTime) {
+                log.info("------------------- DONE -----------------------");
+                PerfClientUtils.exit(0);
+                mainThread.interrupt();
+                return;
+            }
+            if (this.totalNumTxn > 0
+                    && totalEndTxnOpFailNum.sum() + 
totalEndTxnOpSuccessNum.sum() >= this.totalNumTxn) {
+                log.info("------------------- DONE -----------------------");
+                PerfClientUtils.exit(0);
+                mainThread.interrupt();
+                return;
+            }
+
             Message<byte[]> msg;
             try {
                 msg = consumer.receive(Duration.ofSeconds(1));
@@ -490,19 +508,6 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
                 continue;
             }
 
-            if (this.testTime > 0 && System.nanoTime() > testEndTime) {
-                log.info("------------------- DONE -----------------------");
-                PerfClientUtils.exit(0);
-                mainThread.interrupt();
-                return;
-            }
-            if (this.totalNumTxn > 0
-                    && totalEndTxnOpFailNum.sum() + 
totalEndTxnOpSuccessNum.sum() >= this.totalNumTxn) {
-                log.info("------------------- DONE -----------------------");
-                PerfClientUtils.exit(0);
-                mainThread.interrupt();
-                return;
-            }
             messagesReceived.increment();
             bytesReceived.add(msg.size());
             totalMessagesReceived.increment();
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 2978456b8b2..05b82a7e156 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -545,7 +545,8 @@ public class PerformanceProducer extends 
PerformanceTopicListArguments{
 
             AtomicReference<Transaction> transactionAtomicReference;
             if (this.isEnableTransaction) {
-                transactionAtomicReference = new 
AtomicReference<>(client.newTransaction());
+                transactionAtomicReference = new AtomicReference<>(
+                        PerfClientUtils.newTransactionWithRetry(client));
             } else {
                 transactionAtomicReference = new AtomicReference<>(null);
             }
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
index 73c1f79497c..7a84f7894a3 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceTransaction.java
@@ -121,6 +121,15 @@ public class PerformanceTransaction extends 
PerformanceBaseArguments{
             + "not trying to create a topic")
     public Integer partitions = null;
 
+    @Option(names = {"--scalable"}, description = "Create the 
producer/consumer topics as scalable"
+            + " topics (PIP-473) with --scalable-segments initial segments. 
Required for transactions"
+            + " against the scalable-topics (v5) coordinator. Mutually 
exclusive with --partitions.")
+    public boolean scalable = false;
+
+    @Option(names = {"--scalable-segments"}, description = "Number of initial 
segments for scalable"
+            + " topics created via --scalable.")
+    public int scalableSegments = 1;
+
     @Option(names = {"-time",
             "--test-duration"}, description = "Test duration (in second). 0 
means keeping publishing")
     public long testTime = 0;
@@ -208,7 +217,27 @@ public class PerformanceTransaction extends 
PerformanceBaseArguments{
         for (int i = 0; i < payloadBytes.length; ++i) {
             payloadBytes[i] = (byte) (random.nextInt(26) + 65);
         }
-        if (this.partitions != null) {
+        if (this.scalable) {
+            // Scalable topics (PIP-473) must be pre-created via the admin API 
— they don't
+            // auto-create on produce. Create both the produce and consume 
topics so a
+            // transaction against the scalable-topics coordinator has segment 
participants.
+            final PulsarAdminBuilder adminBuilder = PerfClientUtils
+                    .createAdminBuilderFromArguments(this, this.adminURL);
+            try (PulsarAdmin adminClient = adminBuilder.build()) {
+                List<String> allTopics = new ArrayList<>(this.producerTopic);
+                allTopics.addAll(this.consumerTopic);
+                for (String topic : allTopics) {
+                    try {
+                        
adminClient.scalableTopics().createScalableTopic(topic, this.scalableSegments);
+                        log.info().attr("topic", topic).attr("segments", 
this.scalableSegments)
+                                .log("Created scalable topic");
+                    } catch (PulsarAdminException.ConflictException 
alreadyExists) {
+                        log.debug().attr("topic", topic).attr("exists", 
alreadyExists)
+                                .log("Scalable topic already exists");
+                    }
+                }
+            }
+        } else if (this.partitions != null) {
             final PulsarAdminBuilder adminBuilder = PerfClientUtils
                     .createAdminBuilderFromArguments(this, this.adminURL);
 
diff --git 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java
 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java
index e6a5e195e4c..e8867fb5ddd 100644
--- 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java
+++ 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/Oauth2PerformanceTransactionTest.java
@@ -25,25 +25,24 @@ import java.net.URI;
 import java.net.URL;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import lombok.CustomLog;
 import org.apache.pulsar.broker.auth.MockOIDCIdentityProvider;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
 import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.Producer;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.QueueConsumer;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -60,13 +59,17 @@ public class Oauth2PerformanceTransactionTest extends 
ProducerConsumerBase {
     private final String testTenant = "pulsar";
     private final String testNamespace = "perf";
     private final String myNamespace = testTenant + "/" + testNamespace;
-    private final String testTopic = "persistent://" + myNamespace + "/test-";
+    // v5 transactions are scalable-topic-only; scalable topics use the 
topic:// domain.
+    private final String testTopic = "topic://" + myNamespace + "/test-";
 
     // Credentials File, which contains "client_id" and "client_secret"
     private static final String CREDENTIALS_FILE = 
"./src/test/resources/authentication/token/credentials_file.json";
 
     private final String authenticationPlugin = 
"org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2";
 
+    // v5 SDK client for the verification produce/consume (v4 SDK can't use 
scalable topics).
+    private PulsarClient v5Client;
+
     private MockOIDCIdentityProvider server;
     private String authenticationParameters;
 
@@ -118,6 +121,10 @@ public class Oauth2PerformanceTransactionTest extends 
ProducerConsumerBase {
     @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
+        if (v5Client != null) {
+            v5Client.close();
+            v5Client = null;
+        }
         super.internalCleanup();
         server.stop();
     }
@@ -146,15 +153,19 @@ public class Oauth2PerformanceTransactionTest extends 
ProducerConsumerBase {
                 
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
                         new PartitionedTopicMetadata(1));
 
-        replacePulsarClient(PulsarClient.builder().serviceUrl(new 
URI(pulsar.getBrokerServiceUrl()).toString())
-                .statsInterval(0, TimeUnit.SECONDS)
-                .enableTransaction(true)
-                .authentication(authenticationPlugin, 
authenticationParameters));
+        // v5 SDK verification client: v5 transactions are 
scalable-topic-only, and the v4 SDK
+        // can't produce/consume on scalable (topic://) topics. 
transactionPolicy(...) opts the
+        // client into transactions and routes it to the scalable-topics (v5) 
coordinator.
+        v5Client = PulsarClient.builder()
+                .serviceUrl(new URI(pulsar.getBrokerServiceUrl()).toString())
+                .authentication(authenticationPlugin, authenticationParameters)
+                
.transactionPolicy(TransactionPolicy.builder().timeout(Duration.ofMinutes(5)).build())
+                .build();
     }
 
     @Test
     public void testTransactionPerf() throws Exception {
-        String argString = "--topics-c %s --topics-p %s -threads 1 -ntxn 50 -u 
%s -ss %s -np 1 -au %s"
+        String argString = "--topics-c %s --topics-p %s -threads 1 -ntxn 50 -u 
%s -ss %s --scalable -au %s"
                 + " --auth-plugin %s --auth-params %s";
         String testConsumeTopic = testTopic + UUID.randomUUID();
         String testProduceTopic = testTopic + UUID.randomUUID();
@@ -163,27 +174,23 @@ public class Oauth2PerformanceTransactionTest extends 
ProducerConsumerBase {
                 pulsar.getBrokerServiceUrl(), testSub, new 
URL(pulsar.getWebServiceAddress()),
                 authenticationPlugin, authenticationParameters);
 
-        Producer<byte[]> produceToConsumeTopic = 
pulsarClient.newProducer(Schema.BYTES)
-                .producerName("perf-transaction-producer")
-                .sendTimeout(0, TimeUnit.SECONDS)
+        // v5 transactions are scalable-topic-only; scalable topics must be 
pre-created (they don't
+        // auto-create on produce). Create the consume topic so the warm-up 
producer below can write.
+        admin.scalableTopics().createScalableTopic(testConsumeTopic, 1);
+        admin.scalableTopics().createScalableTopic(testProduceTopic, 1);
+
+        Producer<byte[]> produceToConsumeTopic = 
v5Client.newProducer(Schema.bytes())
                 .topic(testConsumeTopic)
                 .create();
-        pulsarClient.newConsumer(Schema.BYTES)
-                .consumerName("perf-transaction-consumeVerify")
+        v5Client.newQueueConsumer(Schema.bytes())
                 .topic(testConsumeTopic)
-                .subscriptionType(SubscriptionType.Shared)
                 .subscriptionName(testSub + "pre")
-                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
                 .subscribe();
-        CountDownLatch countDownLatch = new CountDownLatch(50);
-        for (int i = 0; i < 50
-                ; i++) {
-            produceToConsumeTopic.newMessage().value(("testConsume " + 
i).getBytes()).sendAsync().thenRun(
-                    countDownLatch::countDown);
+        for (int i = 0; i < 50; i++) {
+            produceToConsumeTopic.newMessage().value(("testConsume " + 
i).getBytes()).send();
         }
 
-        countDownLatch.await();
-
         Thread thread = new Thread(() -> {
             try {
                 new PerformanceTransaction().run(args.split(" "));
@@ -193,27 +200,24 @@ public class Oauth2PerformanceTransactionTest extends 
ProducerConsumerBase {
         });
         thread.start();
         thread.join();
-        Consumer<byte[]> consumeFromConsumeTopic = 
pulsarClient.newConsumer(Schema.BYTES)
-                .consumerName("perf-transaction-consumeVerify")
+        QueueConsumer<byte[]> consumeFromConsumeTopic = 
v5Client.newQueueConsumer(Schema.bytes())
                 .topic(testConsumeTopic)
-                .subscriptionType(SubscriptionType.Shared)
                 .subscriptionName(testSub)
-                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
                 .subscribe();
-        Consumer<byte[]> consumeFromProduceTopic = 
pulsarClient.newConsumer(Schema.BYTES)
-                .consumerName("perf-transaction-produceVerify")
+        QueueConsumer<byte[]> consumeFromProduceTopic = 
v5Client.newQueueConsumer(Schema.bytes())
                 .topic(testProduceTopic)
                 .subscriptionName(testSub)
-                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
                 .subscribe();
         for (int i = 0; i < 50; i++) {
-            Message<byte[]> message = consumeFromProduceTopic.receive(2, 
TimeUnit.SECONDS);
+            Message<byte[]> message = 
consumeFromProduceTopic.receive(Duration.ofSeconds(5));
             Assert.assertNotNull(message);
-            consumeFromProduceTopic.acknowledge(message);
+            consumeFromProduceTopic.acknowledge(message.id());
         }
-        Message<byte[]> message = consumeFromConsumeTopic.receive(2, 
TimeUnit.SECONDS);
+        Message<byte[]> message = 
consumeFromConsumeTopic.receive(Duration.ofSeconds(2));
         Assert.assertNull(message);
-        message = consumeFromProduceTopic.receive(2, TimeUnit.SECONDS);
+        message = consumeFromProduceTopic.receive(Duration.ofSeconds(2));
         Assert.assertNull(message);
 
     }
diff --git 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
index a2f75b7873d..4070d95d72d 100644
--- 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
+++ 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceTransactionTest.java
@@ -21,41 +21,49 @@ package org.apache.pulsar.testclient;
 import static org.testng.Assert.fail;
 import com.google.common.collect.Sets;
 import java.net.URL;
+import java.time.Duration;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import lombok.Cleanup;
 import lombok.CustomLog;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.Producer;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.QueueConsumer;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.config.TransactionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+/**
+ * The perf transaction tools target the scalable-topics (v5) coordinator, 
which is transaction-aware
+ * only for scalable {@code topic://} topics (PIP-473). These tests therefore 
pre-create scalable
+ * topics and verify with a v5 SDK client (the v4 SDK can't produce/consume on 
scalable topics). The
+ * tools themselves are unchanged — they just receive {@code topic://} names 
of pre-created topics.
+ */
 @CustomLog
 public class PerformanceTransactionTest extends MockedPulsarServiceBaseTest {
     private final String testTenant = "pulsar";
     private final String testNamespace = "perf";
     private final String myNamespace = testTenant + "/" + testNamespace;
-    private final String testTopic = "persistent://" + myNamespace + "/test-";
+    // v5 transactions are scalable-topic-only; scalable topics use the 
topic:// domain.
+    private final String testTopic = "topic://" + myNamespace + "/test-";
     private final AtomicInteger lastExitCode = new AtomicInteger(0);
 
+    // v5 SDK verification client: v5 transactions are scalable-topic-only, 
and the v4 SDK can't
+    // produce/consume on scalable (topic://) topics.
+    private PulsarClient v5Client;
+
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
@@ -78,11 +86,22 @@ public class PerformanceTransactionTest extends 
MockedPulsarServiceBaseTest {
         
pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources()
                 
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
                         new PartitionedTopicMetadata(1));
+
+        // transactionPolicy(...) opts the verification client into 
transactions and routes it to the
+        // scalable-topics (v5) coordinator.
+        v5Client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                
.transactionPolicy(TransactionPolicy.builder().timeout(Duration.ofMinutes(5)).build())
+                .build();
     }
 
     @AfterMethod(alwaysRun = true)
     @Override
     protected void cleanup() throws Exception {
+        if (v5Client != null) {
+            v5Client.close();
+            v5Client = null;
+        }
         super.internalCleanup();
         int exitCode = lastExitCode.get();
         if (exitCode != 0) {
@@ -90,47 +109,31 @@ public class PerformanceTransactionTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void testTxnPerf() throws Exception {
-        String argString = "--topics-c %s --topics-p %s -threads 1 -ntxn 50 -u 
%s -ss %s -rs -np 1 -au %s";
+        String argString = "--topics-c %s --topics-p %s -threads 1 -ntxn 50 -u 
%s -ss %s --scalable -au %s";
         String testConsumeTopic = testTopic + UUID.randomUUID();
         String testProduceTopic = testTopic + UUID.randomUUID();
         String testSub = "testSub";
-        admin.topics().createPartitionedTopic(testConsumeTopic, 1);
         String args = String.format(argString, testConsumeTopic, 
testProduceTopic,
                 pulsar.getBrokerServiceUrl(), testSub, new 
URL(pulsar.getWebServiceAddress()));
 
-        @Cleanup
-        PulsarClient pulsarClient = PulsarClient.builder()
-                .enableTransaction(true)
-                .serviceUrl(pulsar.getBrokerServiceUrl())
-                .connectionsPerBroker(100)
-                .statsInterval(0, TimeUnit.SECONDS)
-                .build();
-        @Cleanup
-        Producer<byte[]> produceToConsumeTopic = 
pulsarClient.newProducer(Schema.BYTES)
-                .producerName("perf-transaction-producer")
-                .sendTimeout(0, TimeUnit.SECONDS)
+        // Scalable topics must be pre-created (they don't auto-create on 
produce).
+        admin.scalableTopics().createScalableTopic(testConsumeTopic, 1);
+        admin.scalableTopics().createScalableTopic(testProduceTopic, 1);
+
+        Producer<byte[]> produceToConsumeTopic = 
v5Client.newProducer(Schema.bytes())
                 .topic(testConsumeTopic)
                 .create();
-        @Cleanup
-        final Consumer<byte[]> consumer = 
pulsarClient.newConsumer(Schema.BYTES)
-                .consumerName("perf-transaction-consumeVerify")
+        v5Client.newQueueConsumer(Schema.bytes())
                 .topic(testConsumeTopic)
-                .subscriptionType(SubscriptionType.Shared)
                 .subscriptionName(testSub + "pre")
-                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
                 .subscribe();
-        CountDownLatch countDownLatch = new CountDownLatch(50);
-        for (int i = 0; i < 50
-                ; i++) {
-            produceToConsumeTopic.newMessage().value(("testConsume " + 
i).getBytes()).sendAsync().thenRun(
-                    countDownLatch::countDown);
+        for (int i = 0; i < 50; i++) {
+            produceToConsumeTopic.newMessage().value(("testConsume " + 
i).getBytes()).send();
         }
 
-        countDownLatch.await();
-
         Thread thread = new Thread(() -> {
             try {
                 new PerformanceTransaction().run(args.split(" "));
@@ -141,54 +144,41 @@ public class PerformanceTransactionTest extends 
MockedPulsarServiceBaseTest {
         thread.start();
         thread.join();
 
-        // Wait for all async transaction commits to complete before verifying 
messages
-        Awaitility.await().untilAsserted(() -> {
-            admin.transactions().getCoordinatorStats().forEach((integer, 
transactionCoordinatorStats) -> {
-                
Assert.assertEquals(transactionCoordinatorStats.ongoingTxnSize, 0);
-            });
-        });
-
-        Assert.assertTrue(admin.topics().getPartitionedStats(testConsumeTopic, 
false)
-                .getSubscriptions().get(testSub).isReplicated());
-        @Cleanup
-        Consumer<byte[]> consumeFromConsumeTopic = 
pulsarClient.newConsumer(Schema.BYTES)
-                .consumerName("perf-transaction-consumeVerify")
+        QueueConsumer<byte[]> consumeFromConsumeTopic = 
v5Client.newQueueConsumer(Schema.bytes())
                 .topic(testConsumeTopic)
-                .subscriptionType(SubscriptionType.Shared)
                 .subscriptionName(testSub)
-                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
                 .subscribe();
-        @Cleanup
-        Consumer<byte[]> consumeFromProduceTopic = 
pulsarClient.newConsumer(Schema.BYTES)
-                .consumerName("perf-transaction-produceVerify")
+        QueueConsumer<byte[]> consumeFromProduceTopic = 
v5Client.newQueueConsumer(Schema.bytes())
                 .topic(testProduceTopic)
                 .subscriptionName(testSub)
-                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
                 .subscribe();
         for (int i = 0; i < 50; i++) {
-            Message<byte[]> message = consumeFromProduceTopic.receive(10, 
TimeUnit.SECONDS);
+            Message<byte[]> message = 
consumeFromProduceTopic.receive(Duration.ofSeconds(10));
             Assert.assertNotNull(message);
-            consumeFromProduceTopic.acknowledge(message);
+            consumeFromProduceTopic.acknowledge(message.id());
         }
-        Message<byte[]> message = consumeFromConsumeTopic.receive(2, 
TimeUnit.SECONDS);
+        Message<byte[]> message = 
consumeFromConsumeTopic.receive(Duration.ofSeconds(2));
         Assert.assertNull(message);
-        message = consumeFromProduceTopic.receive(2, TimeUnit.SECONDS);
+        message = consumeFromProduceTopic.receive(Duration.ofSeconds(2));
         Assert.assertNull(message);
-
     }
 
-
     @Test
-    public void testProduceTxnMessage() throws InterruptedException, 
PulsarClientException {
+    public void testProduceTxnMessage() throws Exception {
         String argString = "%s -r 50 -u %s -m %d -txn";
         String topic = testTopic + UUID.randomUUID();
         int totalMessage = 100;
         String args = String.format(argString, topic, 
pulsar.getBrokerServiceUrl(), totalMessage);
+
+        admin.scalableTopics().createScalableTopic(topic, 1);
+
         @Cleanup
-        final Consumer<byte[]> subscribe = 
pulsarClient.newConsumer().subscriptionName("subName" + "pre").topic(topic)
-                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
-                .subscriptionType(SubscriptionType.Exclusive)
-                .enableBatchIndexAcknowledgment(false)
+        QueueConsumer<byte[]> subscribe = 
v5Client.newQueueConsumer(Schema.bytes())
+                .subscriptionName("subName" + "pre")
+                .topic(topic)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
                 .subscribe();
         Thread thread = new Thread(() -> {
             try {
@@ -200,51 +190,60 @@ public class PerformanceTransactionTest extends 
MockedPulsarServiceBaseTest {
         thread.start();
         thread.join();
 
-        Awaitility.await().untilAsserted(() -> {
-            admin.transactions().getCoordinatorStats().forEach((integer, 
transactionCoordinatorStats) -> {
-                
Assert.assertEquals(transactionCoordinatorStats.ongoingTxnSize, 0);
-            });
-        });
-
         @Cleanup
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().subscriptionName("subName").topic(topic)
-                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
-                .subscriptionType(SubscriptionType.Exclusive)
-                .enableBatchIndexAcknowledgment(false)
+        QueueConsumer<byte[]> consumer = 
v5Client.newQueueConsumer(Schema.bytes())
+                .subscriptionName("subName")
+                .topic(topic)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
                 .subscribe();
-        for (int i = 0; i < totalMessage; i++) {
-           Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
-           Assert.assertNotNull(message);
-           consumer.acknowledge(message);
+        // PerformanceProducer commits its transactions asynchronously and 
run() returns without
+        // awaiting them, so committed messages may still be becoming visible 
after the join. Drain
+        // up to a deadline rather than assuming all are immediately readable.
+        int received = 0;
+        long deadline = System.currentTimeMillis() + 30_000;
+        while (received < totalMessage && System.currentTimeMillis() < 
deadline) {
+            Message<byte[]> message = consumer.receive(Duration.ofSeconds(2));
+            if (message == null) {
+                continue;
+            }
+            consumer.acknowledge(message.id());
+            received++;
         }
-        Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
+        Assert.assertEquals(received, totalMessage, "all committed produced 
messages must be delivered");
+        Message<byte[]> message = consumer.receive(Duration.ofSeconds(2));
         Assert.assertNull(message);
     }
 
     @Test
     public void testConsumeTxnMessage() throws Exception {
-        String argString = "%s -r 50 -u %s -txn -ss %s -st %s -sp %s -ntxn %d 
-tto 5";
+        // A long transaction timeout (-tto) so none of the consumer's 
transactions time out and abort
+        // on the slower scalable-topic path: an aborted txn would release its 
pending-acked messages
+        // for redelivery and inflate what the verifier sees below.
+        String argString = "%s -r 50 -u %s -txn -ss %s -st %s -sp %s -ntxn %d 
-tto 60";
         String subName = "sub";
         String topic = testTopic + UUID.randomUUID();
+        // -st is PerformanceConsumer's own SubscriptionType enum (Exclusive); 
-sp is the v5
+        // SubscriptionInitialPosition enum (EARLIEST).
         String args = String.format(argString, topic, 
pulsar.getBrokerServiceUrl(), subName,
-                SubscriptionType.Exclusive, 
SubscriptionInitialPosition.Earliest, 10);
+                "Exclusive", "EARLIEST", 10);
+
+        admin.scalableTopics().createScalableTopic(topic, 1);
+
         @Cleanup
-        Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic).sendTimeout(0, TimeUnit.SECONDS)
-                .create();
+        Producer<byte[]> producer = 
v5Client.newProducer(Schema.bytes()).topic(topic).create();
         @Cleanup
-        final Consumer<byte[]> subscribe = 
pulsarClient.newConsumer(Schema.BYTES)
-                .consumerName("perf-transaction-consumeVerify")
+        QueueConsumer<byte[]> subscribe = 
v5Client.newQueueConsumer(Schema.bytes())
                 .topic(topic)
-                .subscriptionType(SubscriptionType.Shared)
                 .subscriptionName(subName + "pre")
-                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
                 .subscribe();
-        for (int i = 0; i < 505; i++) {
+        // Exactly numMessagesPerTransaction (50) * -ntxn (10) messages, so 
the perf consumer commits
+        // all of them across 10 transactions and leaves the subscription 
empty.
+        for (int i = 0; i < 500; i++) {
             producer.newMessage().value("messages for test transaction 
consumer".getBytes()).send();
         }
         Thread thread = new Thread(() -> {
             try {
-                log.info("");
                 new PerformanceConsumer().run(args.split(" "));
             } catch (Exception e) {
                 e.printStackTrace();
@@ -253,26 +252,16 @@ public class PerformanceTransactionTest extends 
MockedPulsarServiceBaseTest {
         thread.start();
         thread.join();
 
-        Awaitility.await().untilAsserted(() -> {
-            admin.transactions().getCoordinatorStats().forEach((integer, 
transactionCoordinatorStats) -> {
-                
Assert.assertEquals(transactionCoordinatorStats.ongoingTxnSize, 0);
-            });
-        });
-
+        // The perf consumer committed all 10 txns * 50 msgs = 500 
transactional acks, so every message
+        // is permanently acknowledged and a fresh consumer on the same 
subscription sees nothing.
         @Cleanup
-        Consumer<byte[]> consumer = 
pulsarClient.newConsumer().subscriptionName(subName).topic(topic)
-                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
-                .subscriptionType(SubscriptionType.Exclusive)
-                .enableBatchIndexAcknowledgment(false)
+        QueueConsumer<byte[]> consumer = 
v5Client.newQueueConsumer(Schema.bytes())
+                .subscriptionName(subName)
+                .topic(topic)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
                 .subscribe();
-        for (int i = 0; i < 5; i++) {
-            Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
-            Assert.assertNotNull(message);
-            consumer.acknowledge(message);
-        }
-        Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
-        Assert.assertNull(message);
-
+        Message<byte[]> message = consumer.receive(Duration.ofSeconds(2));
+        Assert.assertNull(message, "all transactionally-acked messages must 
stay acknowledged");
     }
 
 }
diff --git a/tests/integration/build.gradle.kts 
b/tests/integration/build.gradle.kts
index 693423e5f06..7fb9809a469 100644
--- a/tests/integration/build.gradle.kts
+++ b/tests/integration/build.gradle.kts
@@ -30,6 +30,8 @@ dependencies {
     testImplementation(project(path = ":pulsar-broker-common", configuration = 
"testJar"))
     testImplementation(project(":pulsar-common"))
     testImplementation(project(":pulsar-client-original"))
+    testImplementation(project(":pulsar-client-api-v5"))
+    testImplementation(project(":pulsar-client-v5"))
     testImplementation(project(":pulsar-client-admin-original"))
     testImplementation(project(":pulsar-proxy"))
     testImplementation(project(":managed-ledger"))
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTest.java
index d9a2997f240..b3a7c9cceb8 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTest.java
@@ -18,135 +18,156 @@
  */
 package org.apache.pulsar.tests.integration.transaction;
 
-import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-import java.util.HashSet;
-import java.util.Set;
+import static org.testng.Assert.assertNull;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.CustomLog;
-import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.transaction.Transaction;
-import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import 
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
 import org.apache.pulsar.tests.integration.containers.BrokerContainer;
-import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
 /**
- * Integration test for the metadata-store transaction-coordinator discovery 
path (PIP-473 P5.3).
+ * Integration test for transaction-coordinator coexistence (PIP-473): on a 
cluster with the
+ * scalable-topics (v5) coordinator enabled, a v5 SDK client routes its 
transactions to the
+ * metadata-store coordinator while a v4 SDK client keeps using the legacy 
coordinator — both on the
+ * same cluster. Routing is decided by client/SDK kind (the v5 SDK sets an 
internal flag), not by
+ * broker capability, so flipping the broker default to enable v5 must not 
break v4 transactions.
  *
- * <p>Verifies, across a real multi-broker docker cluster, that a client 
discovers coordinators via
- * the {@code CommandWatchTcAssignments} stream (not the assign-topic lookup) 
and can drive the
- * transaction lifecycle, including after the broker leading a coordinator 
partition is killed.
- *
- * @see TcMetadataDiscoveryTestBase for the scope note (lifecycle, not 
data-in-txn).
+ * <p>Scope: transaction-coordinator routing + the v5 lifecycle/failover. Full 
v5 data-in-transaction
+ * (produce/ack on segment topics) is exercised separately.
  */
 @CustomLog
 public class TcMetadataDiscoveryTest extends TcMetadataDiscoveryTestBase {
 
     /**
-     * With the scalable-topics TC enabled, a client opens the assignment 
watch and can open and
-     * commit / abort transactions across all coordinator partitions. Running 
many transactions
-     * exercises the round-robin spread across the watch-discovered per-leader 
connections.
+     * A v5 SDK client runs many transactions (commit and abort) against the 
v5-enabled cluster. This
+     * only succeeds if the client routed to the running metadata-store 
coordinator — a regression
+     * that broke the watch path or mis-routed v5 to the legacy TC would fail 
here.
      */
     @Test
-    public void transactionLifecycleOverMetadataDiscovery() throws Exception {
+    public void v5SdkTransactionsUseMetadataCoordinator() throws Exception {
         @Cleanup
-        PulsarClient client = PulsarClient.builder()
-                .enableTransaction(true)
-                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
-                .build();
-
-        // Guard against a silent fallback: assert the client actually 
selected the metadata-store
-        // assignment-watch path. Otherwise a regression that breaks the watch 
entirely would still
-        // pass, since the assign topic is initialized with the same partition 
count.
-        
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl 
tcClient =
-                ((org.apache.pulsar.client.impl.PulsarClientImpl) 
client).getTcClient();
-        assertTrue(tcClient.isUsingMetadataDiscovery(),
-                "client should use metadata-store TC discovery, not the 
assign-topic fallback");
-
-        // Run transactions (commit and abort alternately) until every 
coordinator partition has
-        // minted at least one — proving the client discovered and connected 
to each partition's
-        // elected leader. An await loop tolerates the brief startup window 
where a partition is
-        // still mid-election and absent from the assignment snapshot.
-        Set<Long> coordinatorsExercised = new HashSet<>();
-        final int[] i = {0};
-        Awaitility.await()
-                .atMost(1, TimeUnit.MINUTES)
-                .pollInterval(1, TimeUnit.SECONDS)
-                .until(() -> {
-                    Transaction txn = client.newTransaction()
-                            .withTransactionTimeout(1, TimeUnit.MINUTES)
-                            .build().get();
-                    TxnID txnId = txn.getTxnID();
-                    assertNotNull(txnId);
-                    // mostSigBits is the coordinator (TC partition) that 
minted the txn.
-                    coordinatorsExercised.add(txnId.getMostSigBits());
-                    if (i[0]++ % 2 == 0) {
-                        txn.commit().get();
-                    } else {
-                        txn.abort().get();
-                    }
-                    return coordinatorsExercised.size() == TC_PARALLELISM;
-                });
-        assertEquals(coordinatorsExercised.size(), TC_PARALLELISM,
-                "expected transactions to be coordinated by every TC 
partition; got "
-                        + coordinatorsExercised);
+        org.apache.pulsar.client.api.v5.PulsarClient client =
+                org.apache.pulsar.client.api.v5.PulsarClient.builder()
+                        .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                        
.transactionPolicy(org.apache.pulsar.client.api.v5.config.TransactionPolicy.builder()
+                                
.timeout(java.time.Duration.ofMinutes(1)).build())
+                        .build();
+
+        // Enough transactions that the client's round-robin visits every 
coordinator partition; if
+        // any were mis-routed to the legacy TC (which doesn't coordinate 
scalable transactions) or
+        // the watch path were broken, these would fail.
+        runV5Transactions(client, TC_PARALLELISM * 4);
     }
 
     /**
-     * Kill one broker and confirm the client keeps working: the coordinator 
partitions that broker
-     * was leading are re-elected to the survivor, the client's assignment 
watch receives the new
-     * snapshot, retargets its handlers, and subsequent transactions across 
all partitions still
-     * succeed.
+     * Kill one broker and confirm the v5 SDK client keeps working: 
coordinator partitions led by the
+     * dead broker are re-elected to the survivor, the client's assignment 
watch retargets, and
+     * subsequent transactions still succeed.
      */
     @Test
-    public void transactionsSurviveLeaderBrokerFailure() throws Exception {
+    public void v5SdkTransactionsSurviveLeaderBrokerFailure() throws Exception 
{
         @Cleanup
-        PulsarClient client = PulsarClient.builder()
-                .enableTransaction(true)
-                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
-                .operationTimeout(30, TimeUnit.SECONDS)
-                .build();
+        org.apache.pulsar.client.api.v5.PulsarClient client =
+                org.apache.pulsar.client.api.v5.PulsarClient.builder()
+                        .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                        
.transactionPolicy(org.apache.pulsar.client.api.v5.config.TransactionPolicy.builder()
+                                
.timeout(java.time.Duration.ofMinutes(1)).build())
+                        .build();
 
-        // Warm up: confirm every coordinator is reachable before the failure.
-        runTxnOnEveryCoordinator(client);
+        runV5Transactions(client, TC_PARALLELISM * 4);
 
-        // Kill one broker — about half the coordinator partitions lose their 
leader.
         BrokerContainer victim = pulsarCluster.getBrokers().iterator().next();
         log.info().attr("broker", victim.getContainerName()).log("Stopping 
broker to force TC failover");
         victim.stop();
 
-        // After re-election + assignment-watch refresh, transactions across 
all partitions succeed
-        // again. runTxnOnEveryCoordinator already retries within a bounded 
wait while leadership and
-        // the client's handlers converge on the new leaders.
-        runTxnOnEveryCoordinator(client);
+        // After re-election + assignment-watch refresh, transactions succeed 
again. runV5Transactions
+        // retries within a bounded wait while leadership and the client's 
handlers converge.
+        runV5Transactions(client, TC_PARALLELISM * 4);
     }
 
     /**
-     * Open + commit one transaction on each coordinator partition; asserts 
all are covered within a
-     * bounded wait. A coordinator's handler connects asynchronously (and, 
after a failover, may be
-     * briefly mid-reconnect), so a transaction routed to a not-yet-ready 
coordinator throws
-     * {@code MetaStoreHandlerNotReadyException} / times out — those are 
retried rather than failing
-     * the run. The assertion is "every coordinator becomes reachable", not 
"reachable on the first
-     * attempt".
+     * Run {@code count} v5 transactions (commit) back to back. Each 
transaction that fails with a
+     * transient error (a coordinator still connecting, or mid-reconnect after 
a failover) is retried
+     * up to a deadline rather than spacing one transaction per poll interval 
— driving them in a
+     * tight loop keeps total wall-clock bounded by transaction latency, not 
by the retry cadence.
      */
-    private void runTxnOnEveryCoordinator(PulsarClient client) {
-        Set<Long> coordinators = new HashSet<>();
-        Awaitility.await()
-                .atMost(90, TimeUnit.SECONDS)
-                .pollInterval(2, TimeUnit.SECONDS)
-                .ignoreExceptions()
-                .until(() -> {
-                    Transaction txn = client.newTransaction()
-                            .withTransactionTimeout(1, TimeUnit.MINUTES)
-                            .build().get();
-                    coordinators.add(txn.getTxnID().getMostSigBits());
-                    txn.commit().get();
-                    return coordinators.size() == TC_PARALLELISM;
-                });
-        assertTrue(coordinators.size() == TC_PARALLELISM,
-                "expected all " + TC_PARALLELISM + " coordinators reachable; 
got " + coordinators);
+    private void 
runV5Transactions(org.apache.pulsar.client.api.v5.PulsarClient client, int 
count)
+            throws Exception {
+        long deadline = System.currentTimeMillis() + 
TimeUnit.MINUTES.toMillis(3);
+        for (int i = 0; i < count; i++) {
+            while (true) {
+                try {
+                    org.apache.pulsar.client.api.v5.Transaction txn = 
client.newTransaction();
+                    txn.commit();
+                    break;
+                } catch (Exception e) {
+                    if (System.currentTimeMillis() > deadline) {
+                        throw e;
+                    }
+                    Thread.sleep(500);
+                }
+            }
+        }
+    }
+
+    /**
+     * Coexistence: with the v5 coordinator enabled on the cluster, a v4 SDK 
client running a
+     * transaction on a {@code persistent://} topic must still use the legacy 
coordinator and work end
+     * to end. Routing by client kind means the v4 client's commands carry no 
{@code scalable} flag,
+     * so the broker sends them to the legacy TC. This is the regression guard 
for the P5.4 default
+     * flip.
+     */
+    @Test
+    public void v4SdkTransactionStillUsesLegacyCoordinator() throws Exception {
+        @Cleanup
+        org.apache.pulsar.client.api.PulsarClient client =
+                org.apache.pulsar.client.api.PulsarClient.builder()
+                        .enableTransaction(true)
+                        .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                        .build();
+
+        // Routing assertion: a v4 SDK client must NOT use metadata-store 
discovery even though the
+        // cluster has the v5 coordinator enabled — it stays on the legacy 
assign-topic coordinator.
+        TransactionCoordinatorClientImpl tcClient = ((PulsarClientImpl) 
client).getTcClient();
+        assertFalse(tcClient.isUsingMetadataDiscovery(),
+                "v4 SDK client must use the legacy coordinator, not 
metadata-store discovery");
+
+        String topic = "persistent://public/default/v4-coexist-" + 
randomName(6);
+
+        @Cleanup
+        Producer<String> producer = 
client.newProducer(Schema.STRING).topic(topic).create();
+        @Cleanup
+        Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("coexist-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        // Non-transactional produce, then a transactional ack — exercises the 
legacy TC end to end
+        // (newTxn -> addSubscription -> endTxn) on a persistent topic.
+        producer.send("m1");
+        Message<String> msg = consumer.receive(15, TimeUnit.SECONDS);
+        assertNotNull(msg, "should receive the produced message");
+
+        Transaction txn = client.newTransaction()
+                .withTransactionTimeout(1, TimeUnit.MINUTES)
+                .build().get();
+        consumer.acknowledgeAsync(msg.getMessageId(), txn).get();
+        txn.commit().get();
+
+        // After commit the message is acknowledged: redelivery on reconnect 
must not return it.
+        consumer.redeliverUnacknowledgedMessages();
+        assertNull(consumer.receive(5, TimeUnit.SECONDS),
+                "committed transactional ack should have consumed the 
message");
     }
 }


Reply via email to