This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 17ead1d691c [improve][broker] PIP-473 P5.3: metadata-store TC leader 
election + assignment watch (#25929)
17ead1d691c is described below

commit 17ead1d691cde5a02750a2fd0f62ee608203be6a
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Jun 4 17:41:35 2026 -0700

    [improve][broker] PIP-473 P5.3: metadata-store TC leader election + 
assignment watch (#25929)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  16 ++
 .../apache/pulsar/broker/service/ServerCnx.java    |  94 ++++++-
 .../coordinator/v5/TransactionCoordinatorV5.java   | 229 ++++++++++++++---
 .../broker/transaction/metadata/TcLeader.java      |  39 +++
 .../broker/transaction/metadata/TxnPaths.java      |  22 ++
 .../v5/TransactionCoordinatorV5Test.java           | 112 +++++++-
 .../client/impl/TransactionClientConnectTest.java  |  12 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  68 +++++
 .../pulsar/client/impl/ConnectionHandler.java      |  29 ++-
 .../apache/pulsar/client/impl/HandlerState.java    |   7 +
 .../client/impl/TransactionMetaStoreHandler.java   |  65 ++++-
 .../impl/transaction/AssignTopicTcDiscovery.java   | 124 +++++++++
 .../client/impl/transaction/TcDiscovery.java       |  67 +++++
 .../TransactionCoordinatorClientImpl.java          | 138 +++++-----
 .../transaction/WatchTcAssignmentsDiscovery.java   | 285 +++++++++++++++++++++
 .../apache/pulsar/common/protocol/Commands.java    |  66 +++++
 .../pulsar/common/protocol/PulsarDecoder.java      |  32 +++
 pulsar-common/src/main/proto/PulsarApi.proto       |  51 ++++
 .../common/protocol/CommandsTcAssignmentsTest.java | 125 +++++++++
 .../pulsar/proxy/server/ProxyConnection.java       |   4 +-
 .../transaction/TcMetadataDiscoveryTest.java       | 152 +++++++++++
 .../transaction/TcMetadataDiscoveryTestBase.java   |  71 +++++
 .../src/test/resources/pulsar-transaction.xml      |   1 +
 23 files changed, 1695 insertions(+), 114 deletions(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index e5e69ea14ad..f10b2cbdc32 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -3834,6 +3834,22 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private int transactionCoordinatorScalableTopicsGcRetentionSeconds = 900;
 
+    @FieldContext(
+            category = CATEGORY_TRANSACTION,
+            minValue = 1,
+            doc = "Degree of parallelism for the scalable-topics transaction 
coordinator: how many"
+                    + " independent coordinator instances run across the 
cluster. Each is"
+                    + " leader-elected independently in the metadata store and 
coordinates the"
+                    + " transactions whose id maps to it. Fixed at cluster 
bring-up — changing it"
+                    + " later would strand the coordinator id encoded in 
existing transaction ids"
+                    + " (and, because an aborted transaction's records are 
retained as long as its"
+                    + " messages are, the value can only be reduced once all 
transactions created"
+                    + " under the previous value have been fully cleaned up). 
All brokers must agree"
+                    + " on this value; a mismatch is rejected at startup. Only 
relevant when"
+                    + " transactionCoordinatorScalableTopicsEnabled = true."
+    )
+    private int transactionCoordinatorScalableTopicsParallelism = 16;
+
     @FieldContext(
         category = CATEGORY_TRANSACTION,
             doc = "Class name for transaction metadata store provider"
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index af7e8930f36..00253bbb431 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -108,6 +108,7 @@ import 
org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaExce
 import 
org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException;
 import org.apache.pulsar.broker.topiclistlimit.TopicListMemoryLimiter;
 import org.apache.pulsar.broker.topiclistlimit.TopicListSizeResultCache;
+import 
org.apache.pulsar.broker.transaction.coordinator.v5.TransactionCoordinatorV5;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TxnID;
@@ -505,6 +506,10 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         });
         scalableTopicsWatchers.clear();
 
+        // Same for transaction-coordinator assignment watchers.
+        tcAssignmentWatchers.values().forEach(this::closeQuietly);
+        tcAssignmentWatchers.clear();
+
         // Notify the scalable-topic controller that this connection's 
scalable consumers
         // have dropped. The controller marks them disconnected and starts the 
grace-period
         // timer; if they reconnect in time, their assignment is preserved.
@@ -865,6 +870,13 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             ScalableTopicsWatcherSession>
             scalableTopicsWatchers = new ConcurrentHashMap<>();
 
+    // --- Transaction-coordinator assignment watchers ---
+    // watchId -> deregistration handle for the listener registered on 
TransactionCoordinatorV5.
+    private final ConcurrentHashMap<Long, AutoCloseable> tcAssignmentWatchers 
= new ConcurrentHashMap<>();
+    // Delay before re-pushing a TC-assignment snapshot that was incomplete (a 
partition mid-election)
+    // or that failed to build, so the client converges without waiting for an 
external trigger.
+    private static final long TC_ASSIGNMENTS_REPUSH_DELAY_MS = 1000L;
+
     @Override
     protected void handleCommandWatchScalableTopics(
             CommandWatchScalableTopics cmd) {
@@ -965,6 +977,85 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
         }
     }
 
+    // --- Transaction-coordinator assignment watch ---
+
+    @Override
+    protected void handleCommandWatchTcAssignments(
+            org.apache.pulsar.common.api.proto.CommandWatchTcAssignments cmd) {
+        checkArgument(state == State.Connected);
+        final long watchId = cmd.getWatchId();
+        log.debug().attr("watchId", watchId).log("Received 
WatchTcAssignments");
+
+        if 
(!service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled())
 {
+            ctx.writeAndFlush(Commands.newWatchTcAssignmentsError(watchId, 
ServerError.NotAllowedError,
+                    "Scalable-topics transaction coordinator is disabled on 
this broker"));
+            return;
+        }
+        TransactionCoordinatorV5 tc = 
service.getPulsar().getTransactionCoordinatorV5();
+        if (tc == null) {
+            ctx.writeAndFlush(Commands.newWatchTcAssignmentsError(watchId, 
ServerError.ServiceNotReady,
+                    "Transaction coordinator not ready"));
+            return;
+        }
+        // Register a listener that re-pushes the full snapshot on any 
leadership change, then send
+        // the initial snapshot. Authz: this is broker-internal coordination, 
not a per-topic op, so
+        // an authenticated connection is sufficient (same trust model as 
TC_CLIENT_CONNECT).
+        AutoCloseable handle = tc.registerAssignmentChangeListener(
+                () -> ctx.executor().execute(() -> 
sendTcAssignmentsSnapshot(watchId, tc)));
+        AutoCloseable prev = tcAssignmentWatchers.put(watchId, handle);
+        closeQuietly(prev);
+        sendTcAssignmentsSnapshot(watchId, tc);
+    }
+
+    private void sendTcAssignmentsSnapshot(long watchId, 
TransactionCoordinatorV5 tc) {
+        if (!tcAssignmentWatchers.containsKey(watchId)) {
+            return;
+        }
+        tc.buildAssignmentsSnapshot().thenAccept(snapshot -> 
ctx.executor().execute(() -> {
+            if (!tcAssignmentWatchers.containsKey(watchId)) {
+                return;
+            }
+            java.util.Map<Integer, String[]> leaders = new 
java.util.HashMap<>();
+            snapshot.assignments().forEach((partition, leader) -> 
leaders.put(partition,
+                    new String[] {leader.brokerServiceUrl(), 
leader.brokerServiceUrlTls()}));
+            ctx.writeAndFlush(Commands.newWatchTcAssignmentsSnapshot(
+                    watchId, snapshot.partitionCount(), leaders));
+            // If some partition is still mid-election, the snapshot is 
incomplete. Schedule a single
+            // delayed re-push so the client doesn't stay parked on a missing 
partition waiting for a
+            // leadership change that may never come (the cache repopulating 
fires no TC listener).
+            if (!snapshot.isComplete()) {
+                ctx.executor().schedule(() -> 
sendTcAssignmentsSnapshot(watchId, tc),
+                        TC_ASSIGNMENTS_REPUSH_DELAY_MS, TimeUnit.MILLISECONDS);
+            }
+        })).exceptionally(ex -> {
+            log.warn().attr("watchId", watchId).exception(ex)
+                    .log("Failed to build TC-assignments snapshot; retrying 
shortly");
+            ctx.executor().schedule(() -> sendTcAssignmentsSnapshot(watchId, 
tc),
+                    TC_ASSIGNMENTS_REPUSH_DELAY_MS, TimeUnit.MILLISECONDS);
+            return null;
+        });
+    }
+
+    @Override
+    protected void handleCommandWatchTcAssignmentsClose(
+            org.apache.pulsar.common.api.proto.CommandWatchTcAssignmentsClose 
cmd) {
+        checkArgument(state == State.Connected);
+        long watchId = cmd.getWatchId();
+        log.debug().attr("watchId", watchId).log("Received 
WatchTcAssignmentsClose");
+        closeQuietly(tcAssignmentWatchers.remove(watchId));
+    }
+
+    private void closeQuietly(AutoCloseable handle) {
+        if (handle == null) {
+            return;
+        }
+        try {
+            handle.close();
+        } catch (Exception e) {
+            log.warn().exceptionMessage(e).log("Error closing TC-assignment 
watcher");
+        }
+    }
+
     @Override
     protected void handleCommandScalableTopicClose(
             CommandScalableTopicClose commandScalableTopicClose) {
@@ -1316,7 +1407,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             maybeScheduleAuthenticationCredentialsRefresh();
         }
         writeAndFlush(Commands.newConnected(clientProtoVersion, 
maxMessageSize, enableTopicListWatcher,
-                scalableTopicsEnabled));
+                scalableTopicsEnabled,
+                
service.getPulsar().getConfig().isTransactionCoordinatorScalableTopicsEnabled()));
         state = State.Connected;
         service.getPulsarStats().recordConnectionCreateSuccess();
         log.debug()
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
index d4bfd8581a2..22ad4aee3ef 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5.java
@@ -24,9 +24,13 @@ import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -35,6 +39,7 @@ import java.util.function.Supplier;
 import lombok.CustomLog;
 import org.apache.pulsar.broker.PulsarService;
 import 
org.apache.pulsar.broker.transaction.exception.coordinator.TransactionCoordinatorException;
+import org.apache.pulsar.broker.transaction.metadata.TcLeader;
 import org.apache.pulsar.broker.transaction.metadata.TxnEvent;
 import org.apache.pulsar.broker.transaction.metadata.TxnHeader;
 import org.apache.pulsar.broker.transaction.metadata.TxnIds;
@@ -50,6 +55,8 @@ import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.ScanConsumer;
+import org.apache.pulsar.metadata.api.coordination.LeaderElection;
+import org.apache.pulsar.metadata.api.coordination.LeaderElectionState;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
 import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
@@ -57,10 +64,20 @@ import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException
 /**
  * Metadata-driven transaction coordinator for scalable topics — broker-side 
service.
  *
- * <p>Per-partition coordinator. A broker runs the TC for partition {@code N} 
iff it owns
- * partition {@code N} of {@code 
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN} — same
- * leader-election mechanism the legacy {@code 
TransactionMetadataStoreService} uses; reusing
- * it keeps the client-side discovery surface unchanged.
+ * <p>Per-partition coordinator. Leadership rests on the metadata store 
directly: each TC
+ * partition {@code N} has a {@link LeaderElection} at {@code 
/txn/tc/leader/<N>}, and a broker
+ * runs the TC for partition {@code N} iff it currently leads that election. 
This removes the
+ * dependency on the {@code transaction_coordinator_assign} topic and its 
bundle ownership — TC
+ * coordination liveness no longer rides on the topic/namespace/load-balancer 
machinery, only on
+ * the metadata store (which the TC already hard-depends on for every header 
read/write).
+ *
+ * <p><b>Distribution.</b> Every broker calls {@code elect()} on every 
partition (elect-all):
+ * the {@code LeaderElection} primitive only fails a leader over to a broker 
that is already a
+ * candidate, so to keep every partition survivable every broker must be a 
candidate for every
+ * partition. The N independent elections start concurrently, so on a co-start 
leadership lands
+ * roughly balanced across brokers, and every partition has B−1 standby 
candidates for instant
+ * failover. (After a strictly sequential scale-up an early broker can hold 
more partitions until
+ * it restarts; TC load is light, so v1 does not actively rebalance.)
  *
  * <p>Wire commands handled (routed by {@code ServerCnx} when
  * {@code transactionCoordinatorScalableTopicsEnabled} is on):
@@ -71,6 +88,8 @@ import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException
  *       advertise themselves by writing {@code /txn/op} records, so the TC 
doesn't need a
  *       pre-registration step.</li>
  *   <li>{@code END_TXN} → {@link #endTransaction}</li>
+ *   <li>{@code WATCH_TC_ASSIGNMENTS} → {@link #buildAssignmentsSnapshot} + 
push-on-change, the
+ *       client's discovery surface (which broker leads which partition).</li>
  * </ul>
  *
  * <p>{@code endTransaction} CAS-updates the header to the terminal state, 
enumerates
@@ -79,18 +98,18 @@ import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException
  * {@code (segment, subscription)} pair. The fan-out is metadata-store writes 
(not RPCs) and
  * is bounded by the txn's participant count.
  *
- * <p>Background sweeps: a single elected broker — the owner of partition 0 of
- * {@code transaction_coordinator_assign} — periodically (a) aborts timed-out 
open transactions
- * ({@link #sweepTimeouts}) and (b) garbage-collects finalized transactions 
whose retention has
- * elapsed ({@link #sweepGc}). Concurrent sweeps from a stale owner are still 
safe — every state
- * transition is a header CAS — so the single-sweeper election is an 
efficiency measure, not a
- * correctness one.
+ * <p>Background sweeps: the broker that leads partition 0 periodically (a) 
aborts timed-out open
+ * transactions ({@link #sweepTimeouts}) and (b) garbage-collects finalized 
transactions whose
+ * retention has elapsed ({@link #sweepGc}). Concurrent sweeps from a stale 
leader are still safe
+ * — every state transition is a header CAS — so the single-sweeper election 
is an efficiency
+ * measure, not a correctness one.
  */
 @CustomLog
 public class TransactionCoordinatorV5 {
 
     private final PulsarService pulsar;
     private final TxnMetadataStore txnStore;
+    private final int partitionCount;
 
     private final long timeoutSweepIntervalMs;
     private final long gcSweepIntervalMs;
@@ -100,10 +119,18 @@ public class TransactionCoordinatorV5 {
     private final AtomicBoolean timeoutSweepRunning = new AtomicBoolean(false);
     private final AtomicBoolean gcSweepRunning = new AtomicBoolean(false);
 
+    /** Per-partition leader-election controllers, keyed by partition 
(0..partitionCount-1). */
+    private final Map<Integer, LeaderElection<TcLeader>> elections = new 
ConcurrentHashMap<>();
+    /** The local broker's election value — what we propose for every 
partition we lead. */
+    private volatile TcLeader localLeader;
+    /** Open assignment-watch listeners (one per watching client connection). 
*/
+    private final List<Runnable> assignmentChangeListeners = new 
CopyOnWriteArrayList<>();
+
     public TransactionCoordinatorV5(PulsarService pulsar) {
         this.pulsar = pulsar;
         this.txnStore = new TxnMetadataStore(pulsar.getLocalMetadataStore());
         var config = pulsar.getConfiguration();
+        this.partitionCount = 
config.getTransactionCoordinatorScalableTopicsParallelism();
         this.timeoutSweepIntervalMs = TimeUnit.SECONDS.toMillis(
                 
config.getTransactionCoordinatorScalableTopicsTimeoutSweepIntervalSeconds());
         this.gcSweepIntervalMs = TimeUnit.SECONDS.toMillis(
@@ -115,14 +142,32 @@ public class TransactionCoordinatorV5 {
     // ---- Lifecycle --------------------------------------------------------
 
     /**
-     * Start the periodic timeout / GC sweeps on a dedicated single-thread 
scheduler. Each tick is
-     * gated by {@link #ifElectedSweeper} so only the partition-0 owner does 
the scan. Idempotent —
-     * a second call is ignored.
+     * Start the coordinator: create a per-partition {@link LeaderElection} 
and {@code elect()} on
+     * every partition (elect-all), then start the periodic timeout / GC 
sweeps on a dedicated
+     * single-thread scheduler. Sweep ticks are gated by {@link 
#ifElectedSweeper} so only the
+     * partition-0 leader scans. Idempotent — a second call is ignored.
      */
     public synchronized void start() {
         if (closed || sweepExecutor != null) {
             return;
         }
+        verifyParallelismConsistency();
+        this.localLeader = new TcLeader(pulsar.getBrokerId(), 
pulsar.getBrokerServiceUrl(),
+                pulsar.getBrokerServiceUrlTls(), 
pulsar.getSafeWebServiceAddress());
+        for (int partition = 0; partition < partitionCount; partition++) {
+            final int p = partition;
+            LeaderElection<TcLeader> election = 
pulsar.getCoordinationService().getLeaderElection(
+                    TcLeader.class, TxnPaths.tcLeaderPath(p), state -> 
onElectionStateChange(p, state));
+            elections.put(p, election);
+            // elect-all: become a candidate for every partition so leadership 
is balanced across
+            // brokers and every partition has standbys for failover. Errors 
are logged; the
+            // LeaderElection retries internally.
+            election.elect(localLeader).exceptionally(ex -> {
+                log.warn().attr("partition", p).exception(ex).log("v5 TC 
initial elect failed");
+                return null;
+            });
+        }
+
         sweepExecutor = Executors.newSingleThreadScheduledExecutor(
                 new DefaultThreadFactory("pulsar-txn-v5-sweep"));
         sweepExecutor.scheduleWithFixedDelay(
@@ -133,13 +178,87 @@ public class TransactionCoordinatorV5 {
                 gcSweepIntervalMs, gcSweepIntervalMs, TimeUnit.MILLISECONDS);
     }
 
-    /** Stop the sweeps. Idempotent. */
+    /**
+     * Persist this broker's configured parallelism cluster-wide on first 
start, and verify every
+     * subsequent broker agrees. A mismatch means brokers would run different 
election sets and the
+     * coordinator-count encoded in transaction ids would be ambiguous — fatal 
misconfiguration, so
+     * we fail fast rather than start in an inconsistent state.
+     */
+    private void verifyParallelismConsistency() {
+        var store = pulsar.getLocalMetadataStore();
+        try {
+            byte[] value = 
Integer.toString(partitionCount).getBytes(java.nio.charset.StandardCharsets.UTF_8);
+            var existing = store.get(TxnPaths.TXN_TC_PARALLELISM_PATH).get();
+            if (existing.isEmpty()) {
+                // First broker to start writes the value (CAS create; lose 
harmlessly to a racing peer).
+                store.put(TxnPaths.TXN_TC_PARALLELISM_PATH, value, 
java.util.Optional.of(-1L))
+                        .get();
+                var after = store.get(TxnPaths.TXN_TC_PARALLELISM_PATH).get();
+                if (after.isPresent()) {
+                    checkParallelismMatches(after.get().getValue());
+                }
+            } else {
+                checkParallelismMatches(existing.get().getValue());
+            }
+        } catch (IllegalStateException e) {
+            throw e;
+        } catch (Exception e) {
+            // A racing create (BadVersion) or read-after-write resolves by 
re-reading and comparing.
+            try {
+                var after = store.get(TxnPaths.TXN_TC_PARALLELISM_PATH).get();
+                after.ifPresent(r -> checkParallelismMatches(r.getValue()));
+            } catch (Exception ignore) {
+                log.warn().exception(e).log("Could not verify TC parallelism 
consistency; proceeding");
+            }
+        }
+    }
+
+    private void checkParallelismMatches(byte[] storedValue) {
+        int stored = Integer.parseInt(new String(storedValue, 
java.nio.charset.StandardCharsets.UTF_8).trim());
+        if (stored != partitionCount) {
+            throw new IllegalStateException(
+                    "transactionCoordinatorScalableTopicsParallelism mismatch: 
this broker is configured"
+                            + " with " + partitionCount + " but the cluster 
was initialized with " + stored
+                            + ". The value is fixed at cluster bring-up and 
must be identical on every"
+                            + " broker.");
+        }
+    }
+
+    /** Stop the sweeps and release every leader-election lease. Idempotent. */
     public synchronized void close() {
         closed = true;
         if (sweepExecutor != null) {
             sweepExecutor.shutdownNow();
             sweepExecutor = null;
         }
+        elections.values().forEach(e -> e.asyncClose().exceptionally(ex -> {
+            log.warn().exception(ex).log("v5 TC election close failed");
+            return null;
+        }));
+        elections.clear();
+        assignmentChangeListeners.clear();
+    }
+
+    /**
+     * Whether this broker currently leads TC partition {@code partition}. 
Used to gate
+     * client-connect acceptance and the sweep. A partition with no local 
election (out-of-range or
+     * pre-{@code start()}) is not led here.
+     */
+    public boolean isLeaderFor(int partition) {
+        LeaderElection<TcLeader> election = elections.get(partition);
+        return election != null && election.getState() == 
LeaderElectionState.Leading;
+    }
+
+    /** Fire every assignment-watch listener — a leader changed somewhere, so 
the map moved. */
+    private void onElectionStateChange(int partition, LeaderElectionState 
state) {
+        log.debug().attr("partition", partition).attr("state", state).log("v5 
TC election state changed");
+        for (Runnable listener : assignmentChangeListeners) {
+            try {
+                listener.run();
+            } catch (Throwable t) {
+                log.warn().exception(t).log("v5 TC assignment listener 
failed");
+            }
+        }
     }
 
     /**
@@ -174,17 +293,75 @@ public class TransactionCoordinatorV5 {
     // ---- TC client connect ------------------------------------------------
 
     /**
-     * Verify this broker is the leader for {@code tcId} (owns the 
corresponding partition of
-     * {@code transaction_coordinator_assign}). Mirrors the ownership check 
the legacy
-     * {@code TransactionMetadataStoreService.handleTcClientConnect} performs 
— the same
-     * topic-ownership mechanism serves as our leader-election surface.
+     * Verify this broker may coordinate {@code tcId}. A new client reaches us 
via the assignment
+     * watch, so we are the metadata-store election leader for that partition. 
An old client (no
+     * assignment-watch support) reaches us via an assign-topic lookup, so we 
own that partition's
+     * assign-topic bundle. Accept either: both are correctness-safe because 
every transaction
+     * state transition is a metadata-store CAS, so even a stale router can't 
corrupt state. We
+     * accept-if-leader first (cheap, in-memory) and fall back to the 
assign-topic ownership check
+     * only when we're not the election leader.
      */
     public CompletableFuture<Void> 
handleClientConnect(TransactionCoordinatorID tcId) {
+        if (isLeaderFor((int) tcId.getId())) {
+            return CompletableFuture.completedFuture(null);
+        }
         String assignPartition = 
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN
                 .getPartition((int) tcId.getId()).toString();
         return 
pulsar.getBrokerService().checkTopicNsOwnership(assignPartition);
     }
 
+    // ---- Assignment discovery (client watch) ------------------------------
+
+    /**
+     * Build the current full {@code partition → leader} snapshot from the 
election state. Uses the
+     * async {@link LeaderElection#getLeaderValue()} (which loads from the 
metadata store on a cache
+     * miss) rather than the cache-only {@code getLeaderValueIfPresent()}: 
when this broker just
+     * transitioned to {@code Following} for a partition, its local cache for 
the new leader's node
+     * may not be repopulated yet, and a cache-only read would silently omit 
that partition. Loading
+     * from the store closes that window so a follower's snapshot is still 
complete.
+     *
+     * <p>A partition still genuinely without a leader (no broker elected yet) 
is omitted; the caller
+     * ({@code ServerCnx}) re-pushes shortly after so the client isn't 
stranded. Always the complete
+     * map — the watch protocol sends full snapshots, never diffs.
+     *
+     * @return a future of the snapshot plus whether it is complete (every 
partition has a leader)
+     */
+    public CompletableFuture<TcAssignmentsSnapshot> buildAssignmentsSnapshot() 
{
+        Map<Integer, TcLeader> assignments = new ConcurrentSkipListMap<>();
+        List<CompletableFuture<Void>> loads = new 
ArrayList<>(elections.size());
+        for (Map.Entry<Integer, LeaderElection<TcLeader>> e : 
elections.entrySet()) {
+            int partition = e.getKey();
+            loads.add(e.getValue().getLeaderValue()
+                    .thenAccept(opt -> opt.ifPresent(leader -> 
assignments.put(partition, leader)))
+                    .exceptionally(ex -> {
+                        // Treat a load error as "leader unknown for now"; the 
re-push will retry.
+                        log.debug().attr("partition", partition).exception(ex)
+                                .log("v5 TC leader-value load failed while 
building snapshot");
+                        return null;
+                    }));
+        }
+        return FutureUtil.waitForAll(loads)
+                .thenApply(__ -> new TcAssignmentsSnapshot(partitionCount, new 
TreeMap<>(assignments)));
+    }
+
+    /**
+     * Register a listener fired whenever the assignment map may have changed 
(any partition's
+     * leadership moved). Returns an {@link AutoCloseable} that deregisters it 
— the
+     * {@code ServerCnx} closes it when the client closes the watch or 
disconnects.
+     */
+    public AutoCloseable registerAssignmentChangeListener(Runnable listener) {
+        assignmentChangeListeners.add(listener);
+        return () -> assignmentChangeListeners.remove(listener);
+    }
+
+    /** Immutable full assignment snapshot: partition count + the 
currently-known leaders. */
+    public record TcAssignmentsSnapshot(int partitionCount, Map<Integer, 
TcLeader> assignments) {
+        /** @return true if every partition has a known leader (no 
mid-election gaps). */
+        public boolean isComplete() {
+            return assignments.size() == partitionCount;
+        }
+    }
+
     // ---- newTransaction ---------------------------------------------------
 
     /**
@@ -459,21 +636,15 @@ public class TransactionCoordinatorV5 {
     }
 
     /**
-     * Run {@code action} only on the elected sweeper — the broker that owns 
partition 0 of
-     * {@code transaction_coordinator_assign}. Not owning it (or any error 
checking ownership) means
-     * "skip this cycle". Correctness doesn't depend on the election: every 
transition is a header
-     * CAS, so a stale owner sweeping concurrently is harmless.
+     * Run {@code action} only on the elected sweeper — the broker that leads 
TC partition 0. Not
+     * leading it means "skip this cycle". Correctness doesn't depend on the 
election: every
+     * transition is a header CAS, so a stale leader sweeping concurrently is 
harmless.
      */
     private CompletableFuture<Void> 
ifElectedSweeper(Supplier<CompletableFuture<Void>> action) {
-        if (closed) {
+        if (closed || !isLeaderFor(0)) {
             return CompletableFuture.completedFuture(null);
         }
-        String assignPartition0 = 
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN
-                .getPartition(0).toString();
-        return 
pulsar.getBrokerService().checkTopicNsOwnership(assignPartition0)
-                .handle((v, ex) -> ex == null)
-                .thenCompose(owned -> (owned && !closed)
-                        ? action.get() : 
CompletableFuture.completedFuture(null));
+        return action.get();
     }
 
     /** A {@code (segment, subscription)} ack participant; keys the ack 
fan-out de-dup set. */
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TcLeader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TcLeader.java
new file mode 100644
index 00000000000..cefba83d7c9
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TcLeader.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.metadata;
+
+/**
+ * Value stored in a per-partition transaction-coordinator leader-election node
+ * ({@code /txn/tc/leader/<partition>}). Identifies the broker currently 
coordinating that TC
+ * partition and carries the connection URLs a client needs to reach it — so 
any broker can
+ * answer a client's assignment watch from the election value alone, without a 
further lookup.
+ *
+ * <p>Serialized as JSON via {@link 
org.apache.pulsar.common.util.ObjectMapperFactory} by the
+ * coordination service's {@code LeaderElection} serde.
+ *
+ * @param brokerId            the elected broker's id (matches the {@code 
/loadbalance/brokers} key)
+ * @param brokerServiceUrl    the broker's binary service URL (non-TLS); may 
be {@code null} if the
+ *                            broker only advertises a TLS endpoint
+ * @param brokerServiceUrlTls the broker's binary service URL (TLS); may be 
{@code null} if TLS is
+ *                            disabled
+ * @param webServiceUrl       the broker's HTTP service URL, for admin/CLI 
resolution
+ */
+public record TcLeader(String brokerId, String brokerServiceUrl, String 
brokerServiceUrlTls,
+                       String webServiceUrl) {
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
index 0c5bbcfd972..e94d0b17053 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/metadata/TxnPaths.java
@@ -107,6 +107,28 @@ public final class TxnPaths {
         return TXN_TC_SEQ_PREFIX + "/" + tcId;
     }
 
+    /**
+     * Path prefix for the per-partition transaction-coordinator 
leader-election nodes. Each
+     * partition {@code N} has a {@code LeaderElection} under {@code 
/txn/tc/leader/<N>} whose
+     * value is the {@link TcLeader} currently coordinating that partition. 
Replaces the
+     * {@code transaction_coordinator_assign} topic as the v5 TC's election 
surface — election
+     * rests on the metadata store directly, not on topic/bundle ownership.
+     */
+    public static final String TXN_TC_LEADER_PREFIX = "/txn/tc/leader";
+
+    /** @return {@code /txn/tc/leader/<partition>} — the leader-election node 
for {@code partition}. */
+    public static String tcLeaderPath(int partition) {
+        return TXN_TC_LEADER_PREFIX + "/" + partition;
+    }
+
+    /**
+     * Cluster-wide record of the scalable-topics TC parallelism, written once 
by the first broker to
+     * start. Every broker verifies its configured value against this and 
refuses to start on a
+     * mismatch, so the coordinator-count encoded in transaction ids stays 
stable for the cluster's
+     * lifetime.
+     */
+    public static final String TXN_TC_PARALLELISM_PATH = "/txn/tc/parallelism";
+
     /** Width used when formatting long values into 
lexicographically-orderable index keys. */
     public static final int LONG_KEY_WIDTH = 20;
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
index c5f261e85ae..9db63021dd9 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/v5/TransactionCoordinatorV5Test.java
@@ -39,7 +39,9 @@ import org.apache.pulsar.broker.transaction.metadata.TxnState;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.TxnAction;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.coordination.CoordinationService;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
+import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import 
org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
 import org.awaitility.Awaitility;
@@ -60,6 +62,7 @@ public class TransactionCoordinatorV5Test {
     private TxnMetadataStore txnStore;
     private PulsarService pulsar;
     private BrokerService brokerService;
+    private CoordinationService coordinationService;
     private TransactionCoordinatorV5 tc;
 
     @BeforeMethod
@@ -67,17 +70,30 @@ public class TransactionCoordinatorV5Test {
         store = MetadataStoreExtended.create("memory:local",
                 MetadataStoreConfig.builder().fsyncEnable(false).build());
         txnStore = new TxnMetadataStore(store);
+        coordinationService = new CoordinationServiceImpl(store);
         pulsar = mock(PulsarService.class);
         when(pulsar.getLocalMetadataStore()).thenReturn(store);
+        when(pulsar.getCoordinationService()).thenReturn(coordinationService);
+        when(pulsar.getBrokerId()).thenReturn("broker-test:8080");
+        
when(pulsar.getBrokerServiceUrl()).thenReturn("pulsar://broker-test:6650");
+        when(pulsar.getBrokerServiceUrlTls()).thenReturn(null);
+        
when(pulsar.getSafeWebServiceAddress()).thenReturn("http://broker-test:8080";);
         ServiceConfiguration cfg = new ServiceConfiguration();
         // GC sweep tests assume retention has already elapsed.
         cfg.setTransactionCoordinatorScalableTopicsGcRetentionSeconds(0);
+        // Keep the election small so start() converges quickly in unit tests.
+        cfg.setTransactionCoordinatorScalableTopicsParallelism(4);
         when(pulsar.getConfiguration()).thenReturn(cfg);
         brokerService = mock(BrokerService.class);
         when(pulsar.getBrokerService()).thenReturn(brokerService);
-        // Default: owned. Tests that want to assert the not-owned path can 
override.
+        // Default: owned (assign-topic fallback path in handleClientConnect). 
Tests that want to
+        // assert the not-owned path can override.
         
when(brokerService.checkTopicNsOwnership(any())).thenReturn(CompletableFuture.completedFuture(null));
         tc = new TransactionCoordinatorV5(pulsar);
+        tc.start();
+        // As the only broker, we win every partition's election; wait until 
partition 0 is led so
+        // the sweep-gating and client-connect paths behave deterministically.
+        Awaitility.await().until(() -> tc.isLeaderFor(0));
     }
 
     @AfterMethod(alwaysRun = true)
@@ -85,6 +101,9 @@ public class TransactionCoordinatorV5Test {
         if (tc != null) {
             tc.close();
         }
+        if (coordinationService != null) {
+            coordinationService.close();
+        }
         if (store != null) {
             store.close();
         }
@@ -320,16 +339,89 @@ public class TransactionCoordinatorV5Test {
     }
 
     @Test
-    public void sweeps_skipWhenNotElected() throws Exception {
-        // Override the owned-default with a failure → not the elected sweeper 
→ action skipped.
-        when(brokerService.checkTopicNsOwnership(any())).thenReturn(
-                CompletableFuture.failedFuture(new RuntimeException("not 
owner")));
-
+    public void sweeps_skipWhenNotLeader() throws Exception {
+        // Create an expired txn, then drop leadership (close releases the 
election leases). The
+        // sweep is gated on isLeaderFor(0), so on a fresh non-leader TC it 
must skip.
         TxnID expired = tc.newTransaction(TC_ID, 1L, "owner").get();
-        tc.sweepTimeouts().get();
+        tc.close();
+
+        // A second TC that never started (no elections) is not the leader for 
partition 0.
+        TransactionCoordinatorV5 notLeader = new 
TransactionCoordinatorV5(pulsar);
+        try {
+            notLeader.sweepTimeouts().get();
+            // Still OPEN — the sweep never ran because this TC leads no 
partition.
+            var header = 
txnStore.getHeader(TxnIds.toKey(expired)).get().orElseThrow();
+            assertThat(header.value().getState()).isEqualTo(TxnState.OPEN);
+        } finally {
+            notLeader.close();
+        }
+    }
 
-        // Still OPEN — the sweep never ran because we don't own 
assign-partition 0.
-        var header = 
txnStore.getHeader(TxnIds.toKey(expired)).get().orElseThrow();
-        assertThat(header.value().getState()).isEqualTo(TxnState.OPEN);
+    // ---- Election + assignment discovery ----------------------------------
+
+    @Test
+    public void election_singleBrokerLeadsAllPartitions() {
+        // As the only broker, this TC wins every partition's election.
+        for (int p = 0; p < 4; p++) {
+            final int partition = p;
+            Awaitility.await().until(() -> tc.isLeaderFor(partition));
+        }
+        assertThat(tc.isLeaderFor(4)).isFalse(); // out of range (parallelism 
= 4)
+    }
+
+    @Test
+    public void buildAssignmentsSnapshot_reportsAllLedPartitions() {
+        
assertThat(tc.buildAssignmentsSnapshot().join().partitionCount()).isEqualTo(4);
+        Awaitility.await().untilAsserted(() -> {
+            var snap = tc.buildAssignmentsSnapshot().join();
+            assertThat(snap.assignments()).hasSize(4);
+            assertThat(snap.isComplete()).isTrue();
+            assertThat(snap.assignments().get(0).brokerServiceUrl())
+                    .isEqualTo("pulsar://broker-test:6650");
+            
assertThat(snap.assignments().get(0).brokerId()).isEqualTo("broker-test:8080");
+        });
+    }
+
+    @Test
+    public void registerAssignmentChangeListener_deregistersOnClose() throws 
Exception {
+        // The handle deregisters the listener; after close() it must not be 
invoked again. We only
+        // assert the registration/deregistration contract here — the 
fire-on-election-change path
+        // needs a multi-broker setup and is covered at integration level.
+        AutoCloseable handle = tc.registerAssignmentChangeListener(() -> { });
+        handle.close();
+    }
+
+    @Test
+    public void handleClientConnect_acceptsWhenLeader() throws Exception {
+        // We lead partition 0, so connect is accepted without consulting 
assign-topic ownership.
+        tc.handleClientConnect(TC_ID).get();
+    }
+
+    @Test
+    public void start_failsOnParallelismMismatch() throws Exception {
+        // The running tc (from setUp) persisted parallelism=4. A second 
coordinator configured with a
+        // different value against the same metadata store must refuse to 
start.
+        ServiceConfiguration mismatchCfg = new ServiceConfiguration();
+        
mismatchCfg.setTransactionCoordinatorScalableTopicsGcRetentionSeconds(0);
+        mismatchCfg.setTransactionCoordinatorScalableTopicsParallelism(8);
+        PulsarService other = mock(PulsarService.class);
+        when(other.getLocalMetadataStore()).thenReturn(store);
+        when(other.getCoordinationService()).thenReturn(coordinationService);
+        when(other.getConfiguration()).thenReturn(mismatchCfg);
+        when(other.getBrokerId()).thenReturn("broker-other:8080");
+        
when(other.getBrokerServiceUrl()).thenReturn("pulsar://broker-other:6650");
+        
when(other.getSafeWebServiceAddress()).thenReturn("http://broker-other:8080";);
+        when(other.getBrokerService()).thenReturn(brokerService);
+
+        TransactionCoordinatorV5 mismatched = new 
TransactionCoordinatorV5(other);
+        try {
+            assertThatThrownBy(mismatched::start)
+                    .isInstanceOf(IllegalStateException.class)
+                    .hasMessageContaining("mismatch")
+                    .hasMessageContaining("8")
+                    .hasMessageContaining("4");
+        } finally {
+            mismatched.close();
+        }
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
index ecd0479d085..e63542cd060 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java
@@ -138,10 +138,7 @@ public class TransactionClientConnectTest extends 
TransactionTestBase {
     @Test
     public void testPulsarClientCloseThenCloseTcClient() throws Exception {
         TransactionCoordinatorClientImpl transactionCoordinatorClient = 
((PulsarClientImpl) pulsarClient).getTcClient();
-        Field field = 
TransactionCoordinatorClientImpl.class.getDeclaredField("handlers");
-        field.setAccessible(true);
-        TransactionMetaStoreHandler[] handlers =
-                (TransactionMetaStoreHandler[]) 
field.get(transactionCoordinatorClient);
+        java.util.Collection<TransactionMetaStoreHandler> handlers = 
transactionCoordinatorClient.getHandlers();
 
         for (TransactionMetaStoreHandler handler : handlers) {
             handler.newTransactionAsync(10, TimeUnit.SECONDS).get();
@@ -168,11 +165,8 @@ public class TransactionClientConnectTest extends 
TransactionTestBase {
     public void testHandlerStateChangeToReady() throws Exception {
         TransactionCoordinatorClientImpl transactionCoordinatorClient =
                 ((PulsarClientImpl) pulsarClient).getTcClient();
-        Field field = 
TransactionCoordinatorClientImpl.class.getDeclaredField("handlers");
-        field.setAccessible(true);
-        TransactionMetaStoreHandler[] handlers =
-                (TransactionMetaStoreHandler[]) 
field.get(transactionCoordinatorClient);
-        TransactionMetaStoreHandler transactionMetaStoreHandler = handlers[0];
+        TransactionMetaStoreHandler transactionMetaStoreHandler =
+                transactionCoordinatorClient.getHandlers().iterator().next();
         
Assert.assertEquals(transactionMetaStoreHandler.getConnectHandleState(), 
HandlerState.State.Ready);
         Assert.assertTrue(transactionMetaStoreHandler.changeToReadyState());
     }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index ff12f01b40f..16d78585531 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -190,6 +190,12 @@ public class ClientCnx extends PulsarHandler {
                     .concurrencyLevel(1)
                     .build();
 
+    private final ConcurrentLongHashMap<TcAssignmentsWatcherSession> 
tcAssignmentsWatchers =
+            ConcurrentLongHashMap.<TcAssignmentsWatcherSession>newBuilder()
+                    .expectedItems(2)
+                    .concurrencyLevel(1)
+                    .build();
+
     private final CompletableFuture<Void> connectionFuture = new 
CompletableFuture<Void>();
     private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new 
ConcurrentLinkedQueue<>();
 
@@ -238,6 +244,8 @@ public class ClientCnx extends PulsarHandler {
     private boolean supportsTopicWatcherReconcile;
     @Getter
     private boolean supportsScalableTopics;
+    @Getter
+    private boolean supportsTcMetadataDiscovery;
 
     /** Idle stat. **/
     @Getter
@@ -393,6 +401,7 @@ public class ClientCnx extends PulsarHandler {
         dagWatchSessions.forEach((__, session) -> session.connectionClosed());
         scalableConsumerSessions.forEach((__, session) -> 
session.connectionClosed());
         scalableTopicsWatchers.forEach((__, session) -> 
session.connectionClosed());
+        tcAssignmentsWatchers.forEach((__, session) -> 
session.connectionClosed());
 
         waitingLookupRequests.clear();
 
@@ -402,6 +411,7 @@ public class ClientCnx extends PulsarHandler {
         dagWatchSessions.clear();
         scalableConsumerSessions.clear();
         scalableTopicsWatchers.clear();
+        tcAssignmentsWatchers.clear();
 
         timeoutTask.cancel(true);
     }
@@ -458,6 +468,8 @@ public class ClientCnx extends PulsarHandler {
             connected.hasFeatureFlags() && 
connected.getFeatureFlags().isSupportsTopicWatcherReconcile();
         supportsScalableTopics =
             connected.hasFeatureFlags() && 
connected.getFeatureFlags().isSupportsScalableTopics();
+        supportsTcMetadataDiscovery =
+            connected.hasFeatureFlags() && 
connected.getFeatureFlags().isSupportsTcMetadataDiscovery();
 
         // set remote protocol version to the correct version before we 
complete the connection future
         setRemoteEndpointProtocolVersion(connected.getProtocolVersion());
@@ -1516,6 +1528,62 @@ public class ClientCnx extends PulsarHandler {
         scalableTopicsWatchers.remove(watchId);
     }
 
+    /** Client-side receiver for transaction-coordinator assignment snapshots. 
*/
+    public interface TcAssignmentsWatcherSession {
+        void onSnapshot(int parallelism, java.util.Map<Long, String[]> 
leaders);
+
+        void onError(org.apache.pulsar.common.api.proto.ServerError error, 
String message);
+
+        void connectionClosed();
+    }
+
+    public void registerTcAssignmentsWatcher(long watchId, 
TcAssignmentsWatcherSession watcher) {
+        tcAssignmentsWatchers.put(watchId, watcher);
+    }
+
+    public void removeTcAssignmentsWatcher(long watchId) {
+        tcAssignmentsWatchers.remove(watchId);
+    }
+
+    @Override
+    protected void handleCommandWatchTcAssignmentsUpdate(
+            org.apache.pulsar.common.api.proto.CommandWatchTcAssignmentsUpdate 
cmd) {
+        checkArgument(state == State.Ready);
+        long watchId = cmd.getWatchId();
+        log.debug().attr("watchId", watchId).log("Received 
WatchTcAssignmentsUpdate");
+
+        if (cmd.hasError()) {
+            TcAssignmentsWatcherSession session = 
tcAssignmentsWatchers.remove(watchId);
+            if (session != null) {
+                session.onError(cmd.getError(), cmd.hasMessage() ? 
cmd.getMessage() : null);
+            } else {
+                log.warn().attr("watchId", watchId)
+                        .log("Received TC-assignments watch error for unknown 
watcher");
+            }
+            return;
+        }
+
+        TcAssignmentsWatcherSession session = 
tcAssignmentsWatchers.get(watchId);
+        if (session == null) {
+            log.warn().attr("watchId", watchId)
+                    .log("Received TC-assignments watch update for unknown 
watcher");
+            return;
+        }
+        if (!cmd.hasSnapshot()) {
+            log.warn().attr("watchId", watchId).log("TC-assignments update 
with no snapshot payload");
+            return;
+        }
+        var snapshot = cmd.getSnapshot();
+        java.util.Map<Long, String[]> leaders = new java.util.HashMap<>();
+        for (int i = 0; i < snapshot.getAssignmentsCount(); i++) {
+            var a = snapshot.getAssignmentAt(i);
+            leaders.put(a.getTcId(), new String[] {
+                    a.hasBrokerServiceUrl() ? a.getBrokerServiceUrl() : null,
+                    a.hasBrokerServiceUrlTls() ? a.getBrokerServiceUrlTls() : 
null});
+        }
+        session.onSnapshot(snapshot.getParallelism(), leaders);
+    }
+
     /**
      * check serverError and take appropriate action.
      * <ul>
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index 4a704be7e47..e398b0583fa 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -58,6 +58,10 @@ public class ConnectionHandler {
     protected final int randomKeyForSelectConnection;
 
     private volatile Boolean useProxy;
+    // The explicit target broker for connections that bypass topic lookup (v5 
TC metadata-store
+    // discovery). Remembered so the error-retry path (reconnectLater) 
re-dials the same leader
+    // instead of falling back to the service URL. Null means "use the normal 
lookup path".
+    private volatile URI explicitHostURI;
 
     interface Connection {
 
@@ -96,6 +100,18 @@ public class ConnectionHandler {
         grabCnx(Optional.empty());
     }
 
+    /**
+     * Connect to a specific broker {@code hostURI}, routing through the proxy 
when {@code useProxy}
+     * is true (logical = the broker, physical = the proxy) or directly 
otherwise. Used by the v5
+     * transaction coordinator's metadata-store discovery, where the elected 
leader's address is
+     * known but, behind a proxy, isn't directly reachable.
+     */
+    protected void grabCnx(URI hostURI, boolean useProxy) {
+        this.useProxy = useProxy;
+        this.explicitHostURI = hostURI;
+        grabCnx(Optional.of(hostURI));
+    }
+
     protected void grabCnx(Optional<URI> hostURI) {
         if (!duringConnect.compareAndSet(false, true)) {
             log.info().log("Skip grabbing the connection since there is a 
pending connection");
@@ -189,7 +205,15 @@ public class ConnectionHandler {
         if (state.changeToConnecting()) {
             state.client.timer().newTimeout(timeout -> {
                 log.info("Reconnecting after connection was closed");
-                grabCnx();
+                // Re-dial the explicit leader target (v5 TC discovery) if 
set; otherwise the normal
+                // lookup path. Without this, a first-attempt failure during 
failover would fall back
+                // to the service URL and never reach the partition's new 
leader.
+                URI target = explicitHostURI;
+                if (target != null) {
+                    grabCnx(Optional.of(target));
+                } else {
+                    grabCnx();
+                }
             }, delayMs, TimeUnit.MILLISECONDS);
         } else {
             log.info("Ignoring reconnection request");
@@ -203,6 +227,9 @@ public class ConnectionHandler {
     public void connectionClosed(ClientCnx cnx, Optional<Long> 
initialConnectionDelayMs, Optional<URI> hostUrl) {
         lastConnectionClosedTimestamp = System.currentTimeMillis();
         duringConnect.set(false);
+        // Remember an explicit reconnect target so a later first-attempt 
failure (reconnectLater)
+        // re-dials the same broker rather than falling back to the service 
URL.
+        hostUrl.ifPresent(uri -> this.explicitHostURI = uri);
         state.client.getCnxPool().releaseConnection(cnx);
         if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
             if (!state.changeToConnecting()) {
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
index e9f14a06768..ef604cd4d19 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HandlerState.java
@@ -55,6 +55,13 @@ abstract class HandlerState {
 
     protected void setRedirectedClusterURI(String serviceUrl, String 
serviceUrlTls) throws URISyntaxException {
         String url = client.conf.isUseTls() && 
StringUtils.isNotBlank(serviceUrlTls) ? serviceUrlTls : serviceUrl;
+        if (StringUtils.isBlank(url)) {
+            // e.g. a non-TLS client given a TLS-only endpoint (or vice 
versa). Surface a clear,
+            // catchable error rather than letting new URI(null) throw an NPE.
+            throw new URISyntaxException(String.valueOf(url),
+                    "No usable service URL (useTls=" + client.conf.isUseTls()
+                            + ", serviceUrl=" + serviceUrl + ", 
serviceUrlTls=" + serviceUrlTls + ")");
+        }
         this.redirectedClusterURI = new URI(url);
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
index f2731c6b74b..ae52cfd0550 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java
@@ -27,8 +27,10 @@ import io.netty.util.Timer;
 import io.netty.util.TimerTask;
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.URI;
 import java.time.Duration;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
@@ -90,11 +92,37 @@ public class TransactionMetaStoreHandler extends 
HandlerState
     private final long lookupDeadline;
     private final AtomicInteger previousExceptionCount = new AtomicInteger();
 
+    // Metadata-store discovery (watch mode): the elected leader broker for 
this coordinator and
+    // whether it must be reached through the proxy. Null leaderUri means 
assign-topic mode.
+    private volatile URI leaderUri;
+    private volatile boolean useProxy;
+
 
 
     public TransactionMetaStoreHandler(long transactionCoordinatorId, 
PulsarClientImpl pulsarClient, String topic,
                                        CompletableFuture<Void> connectFuture) {
+        this(transactionCoordinatorId, pulsarClient, topic, null, false, 
connectFuture);
+    }
+
+    /**
+     * Construct a handler that connects to a fixed leader broker 
(metadata-store discovery) rather
+     * than resolving a coordinator via an assign-topic lookup. The leader 
address is dialled
+     * through the proxy when {@code useProxy} is true (the broker URL isn't 
directly reachable
+     * behind a proxy) or directly otherwise. Use {@link #retargetLeader} when 
the elected leader
+     * changes.
+     */
+    public TransactionMetaStoreHandler(long transactionCoordinatorId, 
PulsarClientImpl pulsarClient,
+                                       URI leaderUri, boolean useProxy,
+                                       CompletableFuture<Void> connectFuture) {
+        this(transactionCoordinatorId, pulsarClient, null, leaderUri, 
useProxy, connectFuture);
+    }
+
+    private TransactionMetaStoreHandler(long transactionCoordinatorId, 
PulsarClientImpl pulsarClient, String topic,
+                                        URI leaderUri, boolean useProxy,
+                                        CompletableFuture<Void> connectFuture) 
{
         super(pulsarClient, topic);
+        this.leaderUri = leaderUri;
+        this.useProxy = useProxy;
         this.transactionCoordinatorId = transactionCoordinatorId;
         this.timeoutQueue = new ConcurrentLinkedQueue<>();
         this.blockIfReachMaxPendingOps = true;
@@ -117,7 +145,13 @@ public class TransactionMetaStoreHandler extends 
HandlerState
     }
 
     public void start() {
-        this.connectionHandler.grabCnx();
+        if (leaderUri != null) {
+            // Metadata-store discovery: dial the elected leader (through the 
proxy if needed).
+            this.connectionHandler.grabCnx(leaderUri, useProxy);
+        } else {
+            // Assign-topic discovery: resolve the coordinator via a topic 
lookup.
+            this.connectionHandler.grabCnx();
+        }
     }
 
     @Override
@@ -799,8 +833,35 @@ public class TransactionMetaStoreHandler extends 
HandlerState
         return this.connectionHandler.cnx();
     }
 
+    /**
+     * Point this handler at a (possibly new) elected leader broker and 
reconnect. Called by the
+     * metadata-store discovery when an assignment snapshot moves this 
coordinator's leadership to a
+     * different broker. If the leader and proxy-mode are unchanged and the 
handler is already
+     * connected, this is a no-op; otherwise it (re)connects to the new leader.
+     */
+    public void retargetLeader(URI newLeaderUri, boolean newUseProxy) {
+        if (newLeaderUri.equals(this.leaderUri) && newUseProxy == 
this.useProxy && cnx() != null) {
+            return;
+        }
+        this.leaderUri = newLeaderUri;
+        this.useProxy = newUseProxy;
+        ClientCnx current = cnx();
+        if (current != null) {
+            // Drop the current connection; connectionClosed re-grabs against 
the new leader.
+            current.channel().close();
+        } else {
+            connectionHandler.grabCnx(newLeaderUri, newUseProxy);
+        }
+    }
+
     void connectionClosed(ClientCnx cnx) {
-        this.connectionHandler.connectionClosed(cnx);
+        if (leaderUri != null) {
+            // Metadata-store discovery: reconnect to the elected leader (via 
the proxy if needed),
+            // not the configured service URL. useProxy is preserved on the 
ConnectionHandler.
+            this.connectionHandler.connectionClosed(cnx, Optional.empty(), 
Optional.of(leaderUri));
+        } else {
+            this.connectionHandler.connectionClosed(cnx);
+        }
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/AssignTopicTcDiscovery.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/AssignTopicTcDiscovery.java
new file mode 100644
index 00000000000..442bd0b79a9
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/AssignTopicTcDiscovery.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.transaction;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.CustomLog;
+import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
+import org.apache.pulsar.client.util.MathUtils;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
+
+/**
+ * Coordinator discovery via the {@code transaction_coordinator_assign} 
partitioned topic — the
+ * original mechanism. Each coordinator is a partition of the assign topic; 
the handler connects to
+ * the broker that owns that partition's bundle (resolved by an ordinary topic 
lookup). Used against
+ * brokers that don't advertise {@code supports_tc_metadata_discovery}.
+ */
+@CustomLog
+class AssignTopicTcDiscovery implements TcDiscovery {
+
+    private final PulsarClientImpl pulsarClient;
+    private TransactionMetaStoreHandler[] handlers;
+    private final ConcurrentLongHashMap<TransactionMetaStoreHandler> 
handlerMap =
+            ConcurrentLongHashMap.<TransactionMetaStoreHandler>newBuilder()
+                    .expectedItems(16)
+                    .concurrencyLevel(1)
+                    .build();
+    private final AtomicLong epoch = new AtomicLong(0);
+
+    AssignTopicTcDiscovery(PulsarClientImpl pulsarClient) {
+        this.pulsarClient = pulsarClient;
+    }
+
+    @Override
+    public CompletableFuture<Void> start() {
+        return pulsarClient.getPartitionedTopicMetadata(
+                        
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName(), 
true, false)
+                .thenCompose(partitionMeta -> {
+                    List<CompletableFuture<Void>> connectFutureList = new 
ArrayList<>();
+                    log.debug().attr("partitions", partitionMeta.partitions)
+                            .log("Transaction meta store assign partition 
is.");
+                    if (partitionMeta.partitions > 0) {
+                        handlers = new 
TransactionMetaStoreHandler[partitionMeta.partitions];
+                        for (int i = 0; i < partitionMeta.partitions; i++) {
+                            CompletableFuture<Void> connectFuture = new 
CompletableFuture<>();
+                            connectFutureList.add(connectFuture);
+                            TransactionMetaStoreHandler handler = new 
TransactionMetaStoreHandler(
+                                    i, pulsarClient, getTCAssignTopicName(i), 
connectFuture);
+                            handlers[i] = handler;
+                            handlerMap.put(i, handler);
+                            handler.start();
+                        }
+                    } else {
+                        return FutureUtil.failedFuture(new 
TransactionCoordinatorClientException(
+                                "The broker doesn't enable the transaction 
coordinator, "
+                                        + "or the transaction coordinator has 
not initialized"));
+                    }
+                    return FutureUtil.waitForAll(connectFutureList);
+                });
+    }
+
+    private static String getTCAssignTopicName(int partition) {
+        return SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN
+                + TopicName.PARTITIONED_TOPIC_SUFFIX + partition;
+    }
+
+    @Override
+    public TransactionMetaStoreHandler handlerForCoordinator(long tcId) {
+        return handlerMap.get(tcId);
+    }
+
+    @Override
+    public TransactionMetaStoreHandler nextHandler() {
+        if (handlers == null || handlers.length == 0) {
+            return null;
+        }
+        int index = MathUtils.signSafeMod(epoch.incrementAndGet(), 
handlers.length);
+        return handlers[index];
+    }
+
+    @Override
+    public java.util.Collection<TransactionMetaStoreHandler> handlers() {
+        TransactionMetaStoreHandler[] snapshot = handlers;
+        return snapshot == null ? java.util.List.of() : 
java.util.List.of(snapshot);
+    }
+
+    @Override
+    public void close() {
+        if (handlers != null) {
+            for (TransactionMetaStoreHandler handler : handlers) {
+                try {
+                    handler.close();
+                } catch (IOException e) {
+                    log.warn().exception(e).log("Close transaction meta store 
handler error");
+                }
+            }
+            handlers = null;
+        }
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TcDiscovery.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TcDiscovery.java
new file mode 100644
index 00000000000..476823b2868
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TcDiscovery.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.transaction;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
+
+/**
+ * Strategy for locating transaction-coordinator instances and giving the
+ * {@link TransactionCoordinatorClientImpl} a {@link 
TransactionMetaStoreHandler} per coordinator.
+ *
+ * <p>The discriminator is the per-connection {@code 
supports_tc_metadata_discovery} feature flag,
+ * read once at {@link TransactionCoordinatorClientImpl#startAsync()}:
+ * <ul>
+ *   <li>{@link AssignTopicTcDiscovery} — the original mechanism: discover 
coordinators via a
+ *       lookup on the {@code transaction_coordinator_assign} partitioned 
topic. Used against
+ *       brokers that don't advertise the flag (legacy/v4 coordinator, or v5 
coordinator
+ *       disabled).</li>
+ *   <li>{@link WatchTcAssignmentsDiscovery} — the metadata-store election 
mechanism: open one
+ *       assignment watch and point each handler at its partition's elected 
leader broker. Used
+ *       against brokers that advertise the flag.</li>
+ * </ul>
+ *
+ * <p>The handler-routing surface ({@code newTransaction} round-robin, {@code 
commit}/{@code abort}
+ * by {@code TxnID.mostSigBits}) is shared and lives in {@link 
TransactionCoordinatorClientImpl};
+ * only coordinator <em>location</em> differs between strategies.
+ */
+interface TcDiscovery extends AutoCloseable {
+
+    /**
+     * Discover the coordinators and create their handlers. Completes when 
every handler has
+     * connected to its coordinator.
+     */
+    CompletableFuture<Void> start();
+
+    /**
+     * @return the handler for coordinator {@code tcId} (= {@code 
TxnID.mostSigBits}), or
+     *     {@code null} if no such coordinator exists in the current 
assignment.
+     */
+    TransactionMetaStoreHandler handlerForCoordinator(long tcId);
+
+    /**
+     * @return the handler for the next transaction, chosen round-robin across 
coordinators, or
+     *     {@code null} if there are no coordinators available.
+     */
+    TransactionMetaStoreHandler nextHandler();
+
+    /** @return all current coordinator handlers. Visible for testing. */
+    Collection<TransactionMetaStoreHandler> handlers();
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
index 91a0fa5c6ec..6fe938d2d62 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java
@@ -18,13 +18,12 @@
  */
 package org.apache.pulsar.client.impl.transaction;
 
-import java.io.IOException;
-import java.util.ArrayList;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import lombok.CustomLog;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -34,28 +33,23 @@ import 
org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientExce
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
-import org.apache.pulsar.client.util.MathUtils;
 import org.apache.pulsar.common.api.proto.Subscription;
 import org.apache.pulsar.common.api.proto.TxnAction;
-import org.apache.pulsar.common.naming.SystemTopicNames;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 
 /**
- * Transaction coordinator client based topic assigned.
+ * Transaction coordinator client. Coordinator <em>location</em> is delegated 
to a {@link TcDiscovery}
+ * strategy chosen from the broker's {@code supports_tc_metadata_discovery} 
feature flag at
+ * {@link #startAsync()}: {@link WatchTcAssignmentsDiscovery} when the broker 
advertises the
+ * metadata-store election, else {@link AssignTopicTcDiscovery} (the 
assign-topic mechanism). The
+ * routing surface here ({@code newTransaction} round-robin, {@code 
commit}/{@code abort} by
+ * {@code TxnID.mostSigBits}) is the same for both.
  */
 @CustomLog
 public class TransactionCoordinatorClientImpl implements 
TransactionCoordinatorClient {
 
     private final PulsarClientImpl pulsarClient;
-    private TransactionMetaStoreHandler[] handlers;
-    private ConcurrentLongHashMap<TransactionMetaStoreHandler> handlerMap =
-            ConcurrentLongHashMap.<TransactionMetaStoreHandler>newBuilder()
-                    .expectedItems(16)
-                    .concurrencyLevel(1)
-                    .build();
-    private final AtomicLong epoch = new AtomicLong(0);
+    private volatile TcDiscovery discovery;
 
     private static final 
AtomicReferenceFieldUpdater<TransactionCoordinatorClientImpl, State> 
STATE_UPDATER =
             
AtomicReferenceFieldUpdater.newUpdater(TransactionCoordinatorClientImpl.class, 
State.class, "state");
@@ -77,42 +71,50 @@ public class TransactionCoordinatorClientImpl implements 
TransactionCoordinatorC
     @Override
     public CompletableFuture<Void> startAsync() {
         if (STATE_UPDATER.compareAndSet(this, State.NONE, State.STARTING)) {
-            return pulsarClient.getPartitionedTopicMetadata(
-                            
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName(), 
true, false)
-                .thenCompose(partitionMeta -> {
-                    List<CompletableFuture<Void>> connectFutureList = new 
ArrayList<>();
-                        log.debug().attr("partitions", 
partitionMeta.partitions)
-                                .log("Transaction meta store assign partition 
is.");
-                    if (partitionMeta.partitions > 0) {
-                        handlers = new 
TransactionMetaStoreHandler[partitionMeta.partitions];
-                        for (int i = 0; i < partitionMeta.partitions; i++) {
-                            CompletableFuture<Void> connectFuture = new 
CompletableFuture<>();
-                            connectFutureList.add(connectFuture);
-                            TransactionMetaStoreHandler handler = new 
TransactionMetaStoreHandler(
-                                    i, pulsarClient, getTCAssignTopicName(i), 
connectFuture);
-                            handlers[i] = handler;
-                            handlerMap.put(i, handler);
-                            handler.start();
-                        }
-                    } else {
-                        return FutureUtil.failedFuture(new 
TransactionCoordinatorClientException(
-                                "The broker doesn't enable the transaction 
coordinator, "
-                                        + "or the transaction coordinator has 
not initialized"));
-                    }
-
-                    STATE_UPDATER.set(TransactionCoordinatorClientImpl.this, 
State.READY);
-
-                    return FutureUtil.waitForAll(connectFutureList);
-                });
+            return selectDiscovery()
+                    .thenCompose(selected -> {
+                        this.discovery = selected;
+                        log.info().attr("discovery", 
selected.getClass().getSimpleName())
+                                .log("Transaction coordinator discovery 
selected");
+                        return selected.start();
+                    })
+                    .thenRun(() -> STATE_UPDATER.set(this, State.READY));
         } else {
             return FutureUtil.failedFuture(
                     new CoordinatorClientStateException("Can not start while 
current state is " + state));
         }
     }
 
-    private String getTCAssignTopicName(int partition) {
-        return SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN
-                + TopicName.PARTITIONED_TOPIC_SUFFIX + partition;
+    /**
+     * Choose the discovery strategy. The metadata-store assignment watch 
needs a binary-protocol
+     * connection, so it's only usable when the client is configured with a 
{@code pulsar://}
+     * service URL; with an {@code http://} service URL we always use the 
assign-topic flow (which
+     * resolves coordinators via the admin/HTTP-capable partitioned-metadata 
lookup). When binary
+     * lookup is available, probe the broker's {@code 
supports_tc_metadata_discovery} feature flag to
+     * decide; if the broker doesn't advertise it (old broker, or 
scalable-topics TC disabled), fall
+     * back to the assign-topic flow.
+     */
+    private CompletableFuture<TcDiscovery> selectDiscovery() {
+        if (!pulsarClient.getLookup().isBinaryProtoLookupService()) {
+            return CompletableFuture.completedFuture(new 
AssignTopicTcDiscovery(pulsarClient));
+        }
+        // Probe a broker connection to read the feature flag. Use 
getAnyBrokerProxyConnection() (not
+        // getConnectionToServiceUrl()): when connecting through a proxy, the 
latter yields the proxy's
+        // own CONNECTED, which carries the proxy lookup handshake's flags 
rather than a broker's;
+        // getAnyBrokerProxyConnection() pairs to an actual broker (directly 
or proxied) so the
+        // forwarded feature flags reflect the broker — the same connection 
the watch itself uses.
+        // If the probe fails, fall back to the assign-topic flow, whose 
lookup retries across hosts
+        // and still works against v5 brokers (the assign topic exists during 
the deprecation window),
+        // so falling back is always safe.
+        return pulsarClient.getAnyBrokerProxyConnection()
+                .thenApply(cnx -> cnx.isSupportsTcMetadataDiscovery()
+                        ? (TcDiscovery) new 
WatchTcAssignmentsDiscovery(pulsarClient)
+                        : new AssignTopicTcDiscovery(pulsarClient))
+                .exceptionally(ex -> {
+                    log.info().exception(ex)
+                            .log("TC discovery feature probe failed; using 
assign-topic discovery");
+                    return new AssignTopicTcDiscovery(pulsarClient);
+                });
     }
 
     @Override
@@ -131,16 +133,14 @@ public class TransactionCoordinatorClientImpl implements 
TransactionCoordinatorC
             log.warn("The transaction meta store is closing or closed, doing 
nothing.");
             result.complete(null);
         } else {
-            if (handlers != null) {
-                for (TransactionMetaStoreHandler handler : handlers) {
-                    try {
-                        handler.close();
-                    } catch (IOException e) {
-                        log.warn().exception(e).log("Close transaction meta 
store handler error");
-                    }
+            if (discovery != null) {
+                try {
+                    discovery.close();
+                } catch (Exception e) {
+                    log.warn().exception(e).log("Close transaction coordinator 
discovery error");
                 }
+                discovery = null;
             }
-            this.handlers = null;
             result.complete(null);
         }
         return result;
@@ -171,7 +171,12 @@ public class TransactionCoordinatorClientImpl implements 
TransactionCoordinatorC
 
     @Override
     public CompletableFuture<TxnID> newTransactionAsync(long timeout, TimeUnit 
unit) {
-        return nextHandler().newTransactionAsync(timeout, unit);
+        TransactionMetaStoreHandler handler = discovery.nextHandler();
+        if (handler == null) {
+            return FutureUtil.failedFuture(new 
TransactionCoordinatorClientException(
+                    "No transaction coordinator is currently available"));
+        }
+        return handler.newTransactionAsync(timeout, unit);
     }
 
     @Override
@@ -186,7 +191,7 @@ public class TransactionCoordinatorClientImpl implements 
TransactionCoordinatorC
 
     @Override
     public CompletableFuture<Void> addPublishPartitionToTxnAsync(TxnID txnID, 
List<String> partitions) {
-        TransactionMetaStoreHandler handler = 
handlerMap.get(txnID.getMostSigBits());
+        TransactionMetaStoreHandler handler = 
discovery.handlerForCoordinator(txnID.getMostSigBits());
         if (handler == null) {
             return FutureUtil.failedFuture(
                     new 
TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException(
@@ -207,7 +212,7 @@ public class TransactionCoordinatorClientImpl implements 
TransactionCoordinatorC
 
     @Override
     public CompletableFuture<Void> addSubscriptionToTxnAsync(TxnID txnID, 
String topic, String subscription) {
-        TransactionMetaStoreHandler handler = 
handlerMap.get(txnID.getMostSigBits());
+        TransactionMetaStoreHandler handler = 
discovery.handlerForCoordinator(txnID.getMostSigBits());
         if (handler == null) {
             return FutureUtil.failedFuture(
                     new 
TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException(
@@ -230,7 +235,7 @@ public class TransactionCoordinatorClientImpl implements 
TransactionCoordinatorC
 
     @Override
     public CompletableFuture<Void> commitAsync(TxnID txnID) {
-        TransactionMetaStoreHandler handler = 
handlerMap.get(txnID.getMostSigBits());
+        TransactionMetaStoreHandler handler = 
discovery.handlerForCoordinator(txnID.getMostSigBits());
         if (handler == null) {
             return FutureUtil.failedFuture(
                     new 
TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException(
@@ -250,7 +255,7 @@ public class TransactionCoordinatorClientImpl implements 
TransactionCoordinatorC
 
     @Override
     public CompletableFuture<Void> abortAsync(TxnID txnID) {
-        TransactionMetaStoreHandler handler = 
handlerMap.get(txnID.getMostSigBits());
+        TransactionMetaStoreHandler handler = 
discovery.handlerForCoordinator(txnID.getMostSigBits());
         if (handler == null) {
             return FutureUtil.failedFuture(
                     new 
TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException(
@@ -264,8 +269,19 @@ public class TransactionCoordinatorClientImpl implements 
TransactionCoordinatorC
         return state;
     }
 
-    private TransactionMetaStoreHandler nextHandler() {
-        int index = MathUtils.signSafeMod(epoch.incrementAndGet(), 
handlers.length);
-        return handlers[index];
+    /** @return the current coordinator handlers. Visible for testing. */
+    @VisibleForTesting
+    public Collection<TransactionMetaStoreHandler> getHandlers() {
+        return discovery == null ? List.of() : discovery.handlers();
+    }
+
+    /**
+     * @return {@code true} if coordinator discovery uses the metadata-store 
assignment watch (rather
+     *     than the assign-topic fallback). Visible for testing so integration 
tests can assert the
+     *     watch path was actually exercised.
+     */
+    @VisibleForTesting
+    public boolean isUsingMetadataDiscovery() {
+        return discovery instanceof WatchTcAssignmentsDiscovery;
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/WatchTcAssignmentsDiscovery.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/WatchTcAssignmentsDiscovery.java
new file mode 100644
index 00000000000..e1d8609edf1
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/WatchTcAssignmentsDiscovery.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.transaction;
+
+import java.net.URI;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.CustomLog;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.TransactionMetaStoreHandler;
+import org.apache.pulsar.client.util.MathUtils;
+import org.apache.pulsar.common.api.proto.ServerError;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.util.Backoff;
+
+/**
+ * Coordinator discovery via the metadata-store leader election. Opens a single
+ * {@code CommandWatchTcAssignments} watch on a service-URL connection; the 
broker replies with the
+ * full {@code partition -> leader} snapshot and re-pushes it on every 
leadership change. Each
+ * coordinator gets one {@link TransactionMetaStoreHandler} pointed at its 
elected leader broker —
+ * dialled through the proxy when the watch connection is proxied, directly 
otherwise (no
+ * per-coordinator lookup); when a snapshot moves a coordinator's leader, the 
handler is retargeted.
+ * Used against brokers that advertise {@code supports_tc_metadata_discovery}.
+ */
+@CustomLog
+class WatchTcAssignmentsDiscovery implements TcDiscovery, 
ClientCnx.TcAssignmentsWatcherSession {
+
+    private static final AtomicLong WATCH_ID_GENERATOR = new AtomicLong(0);
+
+    private final PulsarClientImpl pulsarClient;
+    private final long watchId = WATCH_ID_GENERATOR.incrementAndGet();
+    private final Backoff reconnectBackoff;
+
+    private final Map<Long, TransactionMetaStoreHandler> handlers = new 
ConcurrentHashMap<>();
+    private final AtomicLong epoch = new AtomicLong(0);
+    private volatile int parallelism;
+
+    private final CompletableFuture<Void> initialSnapshotFuture = new 
CompletableFuture<>();
+    private volatile ClientCnx cnx;
+    private volatile boolean closed;
+    private volatile long initialOpenDeadline;
+    private volatile boolean useProxy;
+
+    WatchTcAssignmentsDiscovery(PulsarClientImpl pulsarClient) {
+        this.pulsarClient = pulsarClient;
+        this.reconnectBackoff = Backoff.builder()
+                .initialDelay(Duration.ofMillis(100))
+                .maxBackoff(Duration.ofSeconds(30))
+                .build();
+    }
+
+    @Override
+    public CompletableFuture<Void> start() {
+        // Bound initial-open retries by the client's lookup timeout (fall 
back to operation timeout
+        // when unset, mirroring TransactionMetaStoreHandler). A transient 
failure on the watch broker
+        // (e.g. ServiceNotReady while its TC initializes) then retries rather 
than hard-failing
+        // transaction-client startup.
+        long lookupTimeoutMs = 
pulsarClient.getConfiguration().getLookupTimeoutMs();
+        if (lookupTimeoutMs < 0) {
+            lookupTimeoutMs = 
pulsarClient.getConfiguration().getOperationTimeoutMs();
+        }
+        this.initialOpenDeadline = System.currentTimeMillis() + 
lookupTimeoutMs;
+        openWatch();
+        return initialSnapshotFuture;
+    }
+
+    private void openWatch() {
+        if (closed) {
+            return;
+        }
+        pulsarClient.getAnyBrokerProxyConnection()
+                .thenAccept(this::attach)
+                .exceptionally(ex -> {
+                    onAttachFailure(ex);
+                    return null;
+                });
+    }
+
+    private void attach(ClientCnx newCnx) {
+        if (closed) {
+            return;
+        }
+        if (!newCnx.isSupportsTcMetadataDiscovery()) {
+            // The broker we landed on doesn't support the watch. On the very 
first open this is a
+            // hard failure (the caller chose this strategy on a probe that 
said it was supported);
+            // after that, it's likely transient — config drift or we hit a 
different/old broker —
+            // so reconnect to find a supporting broker rather than freezing 
on the last snapshot.
+            onAttachFailure(new PulsarClientException(
+                    "Broker does not support metadata-store TC discovery"));
+            return;
+        }
+        this.cnx = newCnx;
+        // Behind a proxy the leader's advertised broker address isn't 
directly reachable; handlers
+        // must dial it through the proxy. The watch connection tells us which 
mode we're in.
+        this.useProxy = newCnx.isProxied();
+        newCnx.registerTcAssignmentsWatcher(watchId, this);
+        newCnx.ctx().writeAndFlush(Commands.newWatchTcAssignments(watchId))
+                .addListener(writeFuture -> {
+                    if (!writeFuture.isSuccess()) {
+                        newCnx.removeTcAssignmentsWatcher(watchId);
+                        onAttachFailure(writeFuture.cause());
+                    }
+                });
+    }
+
+    private void onAttachFailure(Throwable ex) {
+        if (closed) {
+            return;
+        }
+        if (!initialSnapshotFuture.isDone()) {
+            // During initial open, retry retryable failures until the lookup 
deadline rather than
+            // failing transaction-client startup outright — the probe and the 
watch can land on
+            // different brokers, and the watch broker may be briefly 
not-ready.
+            Throwable cause = ex instanceof 
java.util.concurrent.CompletionException && ex.getCause() != null
+                    ? ex.getCause() : ex;
+            boolean retryable = !(cause instanceof PulsarClientException)
+                    || 
PulsarClientException.isRetriableError((PulsarClientException) cause);
+            if (retryable && System.currentTimeMillis() < initialOpenDeadline) 
{
+                log.warn().exception(cause).log("TC-assignments watch open 
failed; retrying");
+                scheduleReconnect();
+                return;
+            }
+            initialSnapshotFuture.completeExceptionally(
+                    PulsarClientException.wrap(ex, "Failed to open 
TC-assignments watch"));
+            return;
+        }
+        scheduleReconnect();
+    }
+
+    @Override
+    public void onSnapshot(int newParallelism, Map<Long, String[]> leaders) {
+        if (closed) {
+            return;
+        }
+        reconnectBackoff.reset();
+        this.parallelism = newParallelism;
+        // Apply the full snapshot: create handlers for newly-seen 
coordinators, retarget existing
+        // ones whose leader moved. A coordinator absent from the snapshot is 
mid-election; leave its
+        // handler in place to retry against its last-known leader until the 
next snapshot.
+        boolean proxy = this.useProxy;
+        for (Map.Entry<Long, String[]> e : leaders.entrySet()) {
+            long tcId = e.getKey();
+            URI leaderUri = selectLeaderUri(e.getValue()[0], e.getValue()[1]);
+            try {
+                handlers.compute(tcId, (id, existing) -> {
+                    if (existing == null) {
+                        TransactionMetaStoreHandler handler = new 
TransactionMetaStoreHandler(
+                                id, pulsarClient, leaderUri, proxy, new 
CompletableFuture<>());
+                        handler.start();
+                        return handler;
+                    }
+                    existing.retargetLeader(leaderUri, proxy);
+                    return existing;
+                });
+            } catch (RuntimeException ex) {
+                // A bad/unusable leader URL for one partition (e.g. a 
TLS-only leader for a non-TLS
+                // client) must not abort applying the rest of the snapshot or 
tear down the watch.
+                log.warn().attr("tcId", tcId).exception(ex)
+                        .log("Skipping TC assignment with unusable leader 
URL");
+            }
+        }
+        if (!initialSnapshotFuture.isDone()) {
+            initialSnapshotFuture.complete(null);
+        }
+    }
+
+    @Override
+    public void onError(ServerError error, String message) {
+        log.warn().attr("error", error).attr("message", 
message).log("WatchTcAssignments rejected");
+        if (!initialSnapshotFuture.isDone()) {
+            initialSnapshotFuture.completeExceptionally(new 
PulsarClientException(
+                    "WatchTcAssignments failed: " + error + (message != null ? 
" - " + message : "")));
+            return;
+        }
+        // Post-initial error: ClientCnx has already removed this session, so 
no connectionClosed()
+        // will fire to drive recovery. Reconnect ourselves so a transient 
rejection (ServiceNotReady
+        // while a broker re-initializes, or transient config drift) can't 
freeze the watch forever.
+        cnx = null;
+        scheduleReconnect();
+    }
+
+    @Override
+    public void connectionClosed() {
+        cnx = null;
+        if (closed) {
+            return;
+        }
+        if (!initialSnapshotFuture.isDone()) {
+            initialSnapshotFuture.completeExceptionally(new 
PulsarClientException(
+                    "Connection closed before initial TC-assignments snapshot 
arrived"));
+            return;
+        }
+        scheduleReconnect();
+    }
+
+    private void scheduleReconnect() {
+        if (closed) {
+            return;
+        }
+        long delayMs = reconnectBackoff.next().toMillis();
+        pulsarClient.timer().newTimeout(timeout -> openWatch(), delayMs, 
TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Pick the leader URL matching the client's TLS setting and parse it. 
Throws if no usable URL is
+     * present (e.g. a non-TLS client and a TLS-only leader) — the caller 
skips that partition rather
+     * than tearing down the watch.
+     */
+    private URI selectLeaderUri(String url, String urlTls) {
+        boolean tls = pulsarClient.getConfiguration().isUseTls();
+        String chosen = tls && urlTls != null && !urlTls.isBlank() ? urlTls : 
url;
+        if (chosen == null || chosen.isBlank()) {
+            throw new IllegalArgumentException("No usable leader URL (useTls=" 
+ tls
+                    + ", url=" + url + ", urlTls=" + urlTls + ")");
+        }
+        return URI.create(chosen);
+    }
+
+    @Override
+    public TransactionMetaStoreHandler handlerForCoordinator(long tcId) {
+        return handlers.get(tcId);
+    }
+
+    @Override
+    public TransactionMetaStoreHandler nextHandler() {
+        int n = parallelism;
+        if (n <= 0) {
+            return null;
+        }
+        // Round-robin over coordinator ids 0..parallelism-1, skipping any 
mid-election gap.
+        for (int attempt = 0; attempt < n; attempt++) {
+            long tcId = MathUtils.signSafeMod(epoch.incrementAndGet(), n);
+            TransactionMetaStoreHandler handler = handlers.get(tcId);
+            if (handler != null) {
+                return handler;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public java.util.Collection<TransactionMetaStoreHandler> handlers() {
+        return new java.util.ArrayList<>(handlers.values());
+    }
+
+    @Override
+    public void close() {
+        closed = true;
+        ClientCnx c = cnx;
+        if (c != null) {
+            c.removeTcAssignmentsWatcher(watchId);
+            
c.ctx().writeAndFlush(Commands.newWatchTcAssignmentsClose(watchId));
+        }
+        handlers.values().forEach(handler -> {
+            try {
+                handler.close();
+            } catch (Exception e) {
+                log.warn().exception(e).log("Close transaction meta store 
handler error");
+            }
+        });
+        handlers.clear();
+    }
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index e6cb2605315..3b8307dcfcb 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -313,6 +313,13 @@ public class Commands {
 
     public static BaseCommand newConnectedCommand(int clientProtocolVersion, 
int maxMessageSize,
                                                   boolean 
supportsTopicWatchers, boolean supportsScalableTopics) {
+        return newConnectedCommand(clientProtocolVersion, maxMessageSize, 
supportsTopicWatchers,
+                supportsScalableTopics, false);
+    }
+
+    public static BaseCommand newConnectedCommand(int clientProtocolVersion, 
int maxMessageSize,
+                                                  boolean 
supportsTopicWatchers, boolean supportsScalableTopics,
+                                                  boolean 
supportsTcMetadataDiscovery) {
         BaseCommand cmd = localCmd(Type.CONNECTED);
         CommandConnected connected = cmd.setConnected()
                 .setServerVersion("Pulsar Server" + 
PulsarVersion.getVersion());
@@ -333,6 +340,7 @@ public class Commands {
         connected.setFeatureFlags().setSupportsReplDedupByLidAndEid(true);
         
connected.setFeatureFlags().setSupportsTopicWatcherReconcile(supportsTopicWatchers);
         
connected.setFeatureFlags().setSupportsScalableTopics(supportsScalableTopics);
+        
connected.setFeatureFlags().setSupportsTcMetadataDiscovery(supportsTcMetadataDiscovery);
         return cmd;
     }
 
@@ -342,6 +350,12 @@ public class Commands {
                 supportsScalableTopics));
     }
 
+    public static ByteBuf newConnected(int clientProtocolVersion, int 
maxMessageSize, boolean supportsTopicWatchers,
+                                       boolean supportsScalableTopics, boolean 
supportsTcMetadataDiscovery) {
+        return serializeWithSize(newConnectedCommand(clientProtocolVersion, 
maxMessageSize, supportsTopicWatchers,
+                supportsScalableTopics, supportsTcMetadataDiscovery));
+    }
+
     public static ByteBuf newAuthChallenge(String authMethod, AuthData 
brokerData, int clientProtocolVersion) {
         BaseCommand cmd = localCmd(Type.AUTH_CHALLENGE);
         CommandAuthChallenge challenge = cmd.setAuthChallenge();
@@ -1869,6 +1883,58 @@ public class Commands {
         return serializeWithSize(cmd);
     }
 
+    // --- Transaction-coordinator assignment watch ---
+
+    /** Client -> Broker: open the TC-assignment watch. */
+    public static ByteBuf newWatchTcAssignments(long watchId) {
+        BaseCommand cmd = localCmd(Type.WATCH_TC_ASSIGNMENTS);
+        cmd.setWatchTcAssignments().setWatchId(watchId);
+        return serializeWithSize(cmd);
+    }
+
+    /** Client -> Broker: close the TC-assignment watch. */
+    public static ByteBuf newWatchTcAssignmentsClose(long watchId) {
+        BaseCommand cmd = localCmd(Type.WATCH_TC_ASSIGNMENTS_CLOSE);
+        cmd.setWatchTcAssignmentsClose().setWatchId(watchId);
+        return serializeWithSize(cmd);
+    }
+
+    /**
+     * Broker -> Client: emit the full {@code partition -> leader} snapshot. 
Sent on initial watch
+     * and again, in full, on every leadership change. A partition currently 
mid-election is simply
+     * absent from {@code leaders}; the client parks transactions routed there 
until a later
+     * snapshot fills it in. URLs in a leader entry may be {@code null} 
(broker advertises only one
+     * of plaintext / TLS).
+     *
+     * @param leaders partition -> {brokerServiceUrl, brokerServiceUrlTls}
+     */
+    public static ByteBuf newWatchTcAssignmentsSnapshot(long watchId, int 
parallelism,
+            java.util.Map<Integer, String[]> leaders) {
+        BaseCommand cmd = new 
BaseCommand().setType(Type.WATCH_TC_ASSIGNMENTS_UPDATE);
+        var snapshot = cmd.setWatchTcAssignmentsUpdate().setWatchId(watchId)
+                .setSnapshot().setParallelism(parallelism);
+        for (var entry : leaders.entrySet()) {
+            String[] urls = entry.getValue();
+            var assignment = snapshot.addAssignment().setTcId(entry.getKey());
+            if (urls[0] != null) {
+                assignment.setBrokerServiceUrl(urls[0]);
+            }
+            if (urls[1] != null) {
+                assignment.setBrokerServiceUrlTls(urls[1]);
+            }
+        }
+        return serializeWithSize(cmd);
+    }
+
+    public static ByteBuf newWatchTcAssignmentsError(long watchId, ServerError 
error, String message) {
+        BaseCommand cmd = new 
BaseCommand().setType(Type.WATCH_TC_ASSIGNMENTS_UPDATE);
+        cmd.setWatchTcAssignmentsUpdate()
+                .setWatchId(watchId)
+                .setError(error)
+                .setMessage(message);
+        return serializeWithSize(cmd);
+    }
+
     public static ByteBuf serializeWithSize(BaseCommand cmd) {
         return serializeWithPrecalculatedSerializedSize(cmd, 
cmd.getSerializedSize());
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
index bca87683f27..9a412c1def1 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/PulsarDecoder.java
@@ -529,6 +529,21 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
                 
handleCommandWatchScalableTopicsClose(cmd.getWatchScalableTopicsClose());
                 break;
 
+            case WATCH_TC_ASSIGNMENTS:
+                checkArgument(cmd.hasWatchTcAssignments());
+                handleCommandWatchTcAssignments(cmd.getWatchTcAssignments());
+                break;
+
+            case WATCH_TC_ASSIGNMENTS_UPDATE:
+                checkArgument(cmd.hasWatchTcAssignmentsUpdate());
+                
handleCommandWatchTcAssignmentsUpdate(cmd.getWatchTcAssignmentsUpdate());
+                break;
+
+            case WATCH_TC_ASSIGNMENTS_CLOSE:
+                checkArgument(cmd.hasWatchTcAssignmentsClose());
+                
handleCommandWatchTcAssignmentsClose(cmd.getWatchTcAssignmentsClose());
+                break;
+
             default:
                 break;
             }
@@ -839,6 +854,23 @@ public abstract class PulsarDecoder extends 
ChannelInboundHandlerAdapter {
         throw new UnsupportedOperationException();
     }
 
+    protected void handleCommandWatchTcAssignments(
+            org.apache.pulsar.common.api.proto.CommandWatchTcAssignments 
commandWatchTcAssignments) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleCommandWatchTcAssignmentsUpdate(
+            org.apache.pulsar.common.api.proto.CommandWatchTcAssignmentsUpdate
+                    commandWatchTcAssignmentsUpdate) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleCommandWatchTcAssignmentsClose(
+            org.apache.pulsar.common.api.proto.CommandWatchTcAssignmentsClose
+                    commandWatchTcAssignmentsClose) {
+        throw new UnsupportedOperationException();
+    }
+
     private void writeAndFlush(ChannelOutboundInvoker ctx, ByteBuf cmd) {
         NettyChannelUtil.writeAndFlushWithVoidPromise(ctx, cmd);
     }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto 
b/pulsar-common/src/main/proto/PulsarApi.proto
index 28501c8bc66..76cd8d382ab 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -317,6 +317,7 @@ message FeatureFlags {
   optional bool supports_repl_dedup_by_lid_and_eid = 6 [default = false];
   optional bool supports_topic_watcher_reconcile = 7 [default = false];
   optional bool supports_scalable_topics = 8 [default = false];
+  optional bool supports_tc_metadata_discovery = 9 [default = false];
 }
 
 message CommandConnected {
@@ -1013,6 +1014,48 @@ message CommandWatchScalableTopicsClose {
     required uint64 watch_id = 1;
 }
 
+/// --- Transaction-coordinator assignment watch ---
+// The scalable-topics transaction coordinator's discovery surface. The client 
opens one watch
+// and the broker replies with the full (partition -> leader) map, then pushes 
a fresh full
+// snapshot whenever any partition's leader changes. There is no point lookup 
and no diff: the
+// map is bounded (parallelism, ~16) and changes rarely, so always sending the 
whole snapshot is
+// simpler and removes a class of apply-ordering / drift bugs. Gated by the
+// supports_tc_metadata_discovery feature flag.
+
+// Client -> Broker: open the assignment watch.
+message CommandWatchTcAssignments {
+    required uint64 watch_id = 1;   // client-assigned
+}
+
+// One (partition -> leader) entry. A partition currently mid-election is 
simply absent from the
+// snapshot; the client parks any transaction routed there until a later 
snapshot fills it in.
+message TcAssignment {
+    required uint64 tc_id                  = 1;   // TC partition = 
TxnID.mostSigBits
+    optional string broker_service_url     = 2;
+    optional string broker_service_url_tls = 3;
+}
+
+// Full map. Sent on initial watch and again, in full, on every change. The 
client replaces its
+// local map wholesale — no merge, no ordering rules. parallelism lets the 
client size its handler
+// array without a separate metadata read.
+message TcAssignmentsSnapshot {
+    required uint32 parallelism       = 1;
+    repeated TcAssignment assignments = 2;
+}
+
+// Broker -> Client: the current full snapshot, or (on initial-watch failure) 
an error.
+message CommandWatchTcAssignmentsUpdate {
+    required uint64 watch_id                 = 1;
+    optional TcAssignmentsSnapshot snapshot  = 2;
+    optional ServerError error               = 3;
+    optional string message                  = 4;
+}
+
+// Client -> Broker: close the watch.
+message CommandWatchTcAssignmentsClose {
+    required uint64 watch_id = 1;
+}
+
 message CommandGetSchema {
     required uint64 request_id = 1;
     required string topic      = 2;
@@ -1262,6 +1305,10 @@ message BaseCommand {
         WATCH_SCALABLE_TOPICS                 = 76;
         WATCH_SCALABLE_TOPICS_UPDATE          = 77;
         WATCH_SCALABLE_TOPICS_CLOSE           = 78;
+
+        WATCH_TC_ASSIGNMENTS                  = 79;
+        WATCH_TC_ASSIGNMENTS_UPDATE           = 80;
+        WATCH_TC_ASSIGNMENTS_CLOSE            = 81;
     }
 
 
@@ -1357,4 +1404,8 @@ message BaseCommand {
     optional CommandWatchScalableTopics watchScalableTopics                    
       = 76;
     optional CommandWatchScalableTopicsUpdate watchScalableTopicsUpdate        
       = 77;
     optional CommandWatchScalableTopicsClose watchScalableTopicsClose          
       = 78;
+
+    optional CommandWatchTcAssignments watchTcAssignments                      
       = 79;
+    optional CommandWatchTcAssignmentsUpdate watchTcAssignmentsUpdate          
       = 80;
+    optional CommandWatchTcAssignmentsClose watchTcAssignmentsClose            
       = 81;
 }
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsTcAssignmentsTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsTcAssignmentsTest.java
new file mode 100644
index 00000000000..3a3071431a9
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandsTcAssignmentsTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.protocol;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import io.netty.buffer.ByteBuf;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.common.api.proto.BaseCommand;
+import org.apache.pulsar.common.api.proto.ServerError;
+import org.testng.annotations.Test;
+
+/**
+ * Roundtrip tests for the {@code Commands.newWatchTcAssignments*} factory 
methods: encode a
+ * command, reparse the serialized wire frame, and verify the fields survive 
the trip.
+ */
+public class CommandsTcAssignmentsTest {
+
+    private static BaseCommand parseFrame(ByteBuf frame) {
+        try {
+            frame.skipBytes(4); // total size
+            int cmdSize = (int) frame.readUnsignedInt();
+            BaseCommand cmd = new BaseCommand();
+            cmd.parseFrom(frame, cmdSize);
+            cmd.materialize();
+            return cmd;
+        } finally {
+            frame.release();
+        }
+    }
+
+    @Test
+    public void testNewWatchTcAssignments() {
+        BaseCommand cmd = parseFrame(Commands.newWatchTcAssignments(7L));
+        assertEquals(cmd.getType(), BaseCommand.Type.WATCH_TC_ASSIGNMENTS);
+        assertTrue(cmd.hasWatchTcAssignments());
+        assertEquals(cmd.getWatchTcAssignments().getWatchId(), 7L);
+    }
+
+    @Test
+    public void testNewWatchTcAssignmentsClose() {
+        BaseCommand cmd = parseFrame(Commands.newWatchTcAssignmentsClose(7L));
+        assertEquals(cmd.getType(), 
BaseCommand.Type.WATCH_TC_ASSIGNMENTS_CLOSE);
+        assertTrue(cmd.hasWatchTcAssignmentsClose());
+        assertEquals(cmd.getWatchTcAssignmentsClose().getWatchId(), 7L);
+    }
+
+    @Test
+    public void testNewWatchTcAssignmentsSnapshot() {
+        Map<Integer, String[]> leaders = new HashMap<>();
+        leaders.put(0, new String[] {"pulsar://b0:6650", 
"pulsar+ssl://b0:6651"});
+        leaders.put(2, new String[] {"pulsar://b2:6650", null}); // partition 
1 mid-election (absent)
+
+        BaseCommand cmd = 
parseFrame(Commands.newWatchTcAssignmentsSnapshot(7L, 3, leaders));
+        assertEquals(cmd.getType(), 
BaseCommand.Type.WATCH_TC_ASSIGNMENTS_UPDATE);
+        assertTrue(cmd.hasWatchTcAssignmentsUpdate());
+        var update = cmd.getWatchTcAssignmentsUpdate();
+        assertEquals(update.getWatchId(), 7L);
+        assertFalse(update.hasError());
+        assertTrue(update.hasSnapshot());
+
+        var snapshot = update.getSnapshot();
+        assertEquals(snapshot.getParallelism(), 3);
+        assertEquals(snapshot.getAssignmentsCount(), 2);
+
+        // Decode into a map for order-independent assertions.
+        Map<Long, String[]> decoded = new HashMap<>();
+        for (int i = 0; i < snapshot.getAssignmentsCount(); i++) {
+            var a = snapshot.getAssignmentAt(i);
+            decoded.put(a.getTcId(), new String[] {
+                    a.hasBrokerServiceUrl() ? a.getBrokerServiceUrl() : null,
+                    a.hasBrokerServiceUrlTls() ? a.getBrokerServiceUrlTls() : 
null});
+        }
+        assertEquals(decoded.get(0L)[0], "pulsar://b0:6650");
+        assertEquals(decoded.get(0L)[1], "pulsar+ssl://b0:6651");
+        assertEquals(decoded.get(2L)[0], "pulsar://b2:6650");
+        assertNull(decoded.get(2L)[1]);
+        assertNull(decoded.get(1L)); // mid-election partition omitted
+    }
+
+    @Test
+    public void testNewWatchTcAssignmentsError() {
+        BaseCommand cmd = parseFrame(
+                Commands.newWatchTcAssignmentsError(7L, 
ServerError.NotAllowedError, "disabled"));
+        assertEquals(cmd.getType(), 
BaseCommand.Type.WATCH_TC_ASSIGNMENTS_UPDATE);
+        var update = cmd.getWatchTcAssignmentsUpdate();
+        assertEquals(update.getWatchId(), 7L);
+        assertTrue(update.hasError());
+        assertEquals(update.getError(), ServerError.NotAllowedError);
+        assertEquals(update.getMessage(), "disabled");
+        assertFalse(update.hasSnapshot());
+    }
+
+    @Test
+    public void testConnectedAdvertisesTcMetadataDiscoveryFlag() {
+        BaseCommand on = parseFrame(Commands.newConnected(
+                /* clientProtocolVersion */ 21, /* maxMessageSize */ 1024,
+                /* supportsTopicWatchers */ true, /* supportsScalableTopics */ 
true,
+                /* supportsTcMetadataDiscovery */ true));
+        
assertTrue(on.getConnected().getFeatureFlags().isSupportsTcMetadataDiscovery());
+
+        BaseCommand off = parseFrame(Commands.newConnected(
+                21, 1024, true, true));
+        
assertFalse(off.getConnected().getFeatureFlags().isSupportsTcMetadataDiscovery());
+    }
+}
diff --git 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 53bd36f704d..70797e2671e 100644
--- 
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ 
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -520,7 +520,9 @@ public class ProxyConnection extends PulsarHandler {
                     connected.hasMaxMessageSize() ? 
connected.getMaxMessageSize() : Commands.INVALID_MAX_MESSAGE_SIZE;
             final ByteBuf msg = 
Commands.newConnected(connected.getProtocolVersion(), maxMessageSize,
                     connected.hasFeatureFlags() && 
connected.getFeatureFlags().isSupportsTopicWatchers(),
-                    connected.hasFeatureFlags() && 
connected.getFeatureFlags().isSupportsScalableTopics());
+                    connected.hasFeatureFlags() && 
connected.getFeatureFlags().isSupportsScalableTopics(),
+                    connected.hasFeatureFlags()
+                            && 
connected.getFeatureFlags().isSupportsTcMetadataDiscovery());
             writeAndFlush(msg);
             // Start auth refresh task only if we are not forwarding 
authorization credentials
             if 
(!service.getConfiguration().isForwardAuthorizationCredentials()) {
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTest.java
new file mode 100644
index 00000000000..d9a2997f240
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.transaction;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import lombok.CustomLog;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Integration test for the metadata-store transaction-coordinator discovery 
path (PIP-473 P5.3).
+ *
+ * <p>Verifies, across a real multi-broker docker cluster, that a client 
discovers coordinators via
+ * the {@code CommandWatchTcAssignments} stream (not the assign-topic lookup) 
and can drive the
+ * transaction lifecycle, including after the broker leading a coordinator 
partition is killed.
+ *
+ * @see TcMetadataDiscoveryTestBase for the scope note (lifecycle, not 
data-in-txn).
+ */
+@CustomLog
+public class TcMetadataDiscoveryTest extends TcMetadataDiscoveryTestBase {
+
+    /**
+     * With the scalable-topics TC enabled, a client opens the assignment 
watch and can open and
+     * commit / abort transactions across all coordinator partitions. Running 
many transactions
+     * exercises the round-robin spread across the watch-discovered per-leader 
connections.
+     */
+    @Test
+    public void transactionLifecycleOverMetadataDiscovery() throws Exception {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .enableTransaction(true)
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                .build();
+
+        // Guard against a silent fallback: assert the client actually 
selected the metadata-store
+        // assignment-watch path. Otherwise a regression that breaks the watch 
entirely would still
+        // pass, since the assign topic is initialized with the same partition 
count.
+        
org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl 
tcClient =
+                ((org.apache.pulsar.client.impl.PulsarClientImpl) 
client).getTcClient();
+        assertTrue(tcClient.isUsingMetadataDiscovery(),
+                "client should use metadata-store TC discovery, not the 
assign-topic fallback");
+
+        // Run transactions (commit and abort alternately) until every 
coordinator partition has
+        // minted at least one — proving the client discovered and connected 
to each partition's
+        // elected leader. An await loop tolerates the brief startup window 
where a partition is
+        // still mid-election and absent from the assignment snapshot.
+        Set<Long> coordinatorsExercised = new HashSet<>();
+        final int[] i = {0};
+        Awaitility.await()
+                .atMost(1, TimeUnit.MINUTES)
+                .pollInterval(1, TimeUnit.SECONDS)
+                .until(() -> {
+                    Transaction txn = client.newTransaction()
+                            .withTransactionTimeout(1, TimeUnit.MINUTES)
+                            .build().get();
+                    TxnID txnId = txn.getTxnID();
+                    assertNotNull(txnId);
+                    // mostSigBits is the coordinator (TC partition) that 
minted the txn.
+                    coordinatorsExercised.add(txnId.getMostSigBits());
+                    if (i[0]++ % 2 == 0) {
+                        txn.commit().get();
+                    } else {
+                        txn.abort().get();
+                    }
+                    return coordinatorsExercised.size() == TC_PARALLELISM;
+                });
+        assertEquals(coordinatorsExercised.size(), TC_PARALLELISM,
+                "expected transactions to be coordinated by every TC 
partition; got "
+                        + coordinatorsExercised);
+    }
+
+    /**
+     * Kill one broker and confirm the client keeps working: the coordinator 
partitions that broker
+     * was leading are re-elected to the survivor, the client's assignment 
watch receives the new
+     * snapshot, retargets its handlers, and subsequent transactions across 
all partitions still
+     * succeed.
+     */
+    @Test
+    public void transactionsSurviveLeaderBrokerFailure() throws Exception {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .enableTransaction(true)
+                .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                .operationTimeout(30, TimeUnit.SECONDS)
+                .build();
+
+        // Warm up: confirm every coordinator is reachable before the failure.
+        runTxnOnEveryCoordinator(client);
+
+        // Kill one broker — about half the coordinator partitions lose their 
leader.
+        BrokerContainer victim = pulsarCluster.getBrokers().iterator().next();
+        log.info().attr("broker", victim.getContainerName()).log("Stopping 
broker to force TC failover");
+        victim.stop();
+
+        // After re-election + assignment-watch refresh, transactions across 
all partitions succeed
+        // again. runTxnOnEveryCoordinator already retries within a bounded 
wait while leadership and
+        // the client's handlers converge on the new leaders.
+        runTxnOnEveryCoordinator(client);
+    }
+
+    /**
+     * Open + commit one transaction on each coordinator partition; asserts 
all are covered within a
+     * bounded wait. A coordinator's handler connects asynchronously (and, 
after a failover, may be
+     * briefly mid-reconnect), so a transaction routed to a not-yet-ready 
coordinator throws
+     * {@code MetaStoreHandlerNotReadyException} / times out — those are 
retried rather than failing
+     * the run. The assertion is "every coordinator becomes reachable", not 
"reachable on the first
+     * attempt".
+     */
+    private void runTxnOnEveryCoordinator(PulsarClient client) {
+        Set<Long> coordinators = new HashSet<>();
+        Awaitility.await()
+                .atMost(90, TimeUnit.SECONDS)
+                .pollInterval(2, TimeUnit.SECONDS)
+                .ignoreExceptions()
+                .until(() -> {
+                    Transaction txn = client.newTransaction()
+                            .withTransactionTimeout(1, TimeUnit.MINUTES)
+                            .build().get();
+                    coordinators.add(txn.getTxnID().getMostSigBits());
+                    txn.commit().get();
+                    return coordinators.size() == TC_PARALLELISM;
+                });
+        assertTrue(coordinators.size() == TC_PARALLELISM,
+                "expected all " + TC_PARALLELISM + " coordinators reachable; 
got " + coordinators);
+    }
+}
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTestBase.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTestBase.java
new file mode 100644
index 00000000000..8af84ab229e
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/transaction/TcMetadataDiscoveryTestBase.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.transaction;
+
+import lombok.CustomLog;
+import org.apache.pulsar.tests.integration.containers.BrokerContainer;
+import org.apache.pulsar.tests.integration.containers.ZKContainer;
+import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
+
+/**
+ * Base for the metadata-store transaction-coordinator discovery tests. Brings 
up a multi-broker
+ * cluster with the scalable-topics transaction coordinator enabled, so 
leadership of the TC
+ * partitions is established via the metadata-store election and clients 
discover coordinators via
+ * the {@code CommandWatchTcAssignments} stream rather than the assign-topic 
lookup.
+ *
+ * <p>Scope note: only the transaction-coordinator <em>control plane</em> is 
enabled here. Producing
+ * / acking data inside a transaction additionally requires the scalable-topic 
transaction buffer
+ * and pending-ack providers (and {@code segment://} topics), which land with 
the default flip. These
+ * tests therefore exercise the transaction <em>lifecycle</em> (newTransaction 
/ commit / abort) over
+ * the discovered connections — which is exactly the surface the new client 
discovery path drives.
+ */
+@CustomLog
+public abstract class TcMetadataDiscoveryTestBase extends PulsarTestSuite {
+
+    /** Number of TC partitions; small so leadership spreads predictably 
across the brokers. */
+    protected static final int TC_PARALLELISM = 4;
+
+    @Override
+    protected void beforeStartCluster() throws Exception {
+        super.beforeStartCluster();
+        for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
+            // transactionCoordinatorEnabled is present in broker.conf, so the 
bare env var name
+            // overrides it. The two scalable-topics settings are NOT in 
broker.conf, so they must
+            // use the PULSAR_PREFIX_ prefix to be appended as new config keys 
by
+            // apply-config-from-env.py — otherwise a bare name is silently 
ignored.
+            brokerContainer.withEnv("transactionCoordinatorEnabled", "true");
+            
brokerContainer.withEnv("PULSAR_PREFIX_transactionCoordinatorScalableTopicsEnabled",
 "true");
+            
brokerContainer.withEnv("PULSAR_PREFIX_transactionCoordinatorScalableTopicsParallelism",
+                    Integer.toString(TC_PARALLELISM));
+        }
+    }
+
+    @Override
+    public void setupCluster() throws Exception {
+        super.setupCluster();
+        // The assign-topic partitioned metadata is still created so the 
legacy ownership-based
+        // fallback in handleClientConnect remains valid during the 
deprecation window.
+        BrokerContainer brokerContainer = 
pulsarCluster.getBrokers().iterator().next();
+        brokerContainer.execCmd(
+                "/pulsar/bin/pulsar", 
"initialize-transaction-coordinator-metadata",
+                "-cs", ZKContainer.NAME,
+                "-c", pulsarCluster.getClusterName(),
+                "--initial-num-transaction-coordinators", 
Integer.toString(TC_PARALLELISM));
+    }
+}
diff --git a/tests/integration/src/test/resources/pulsar-transaction.xml 
b/tests/integration/src/test/resources/pulsar-transaction.xml
index 72c375d000d..0c23b9e93ab 100644
--- a/tests/integration/src/test/resources/pulsar-transaction.xml
+++ b/tests/integration/src/test/resources/pulsar-transaction.xml
@@ -23,6 +23,7 @@
     <test name="pulsar-transaction-test-suite" preserve-order="true" >
         <classes>
             <class 
name="org.apache.pulsar.tests.integration.transaction.TransactionTest" />
+            <class 
name="org.apache.pulsar.tests.integration.transaction.TcMetadataDiscoveryTest" 
/>
         </classes>
     </test>
 </suite>

Reply via email to