+1.

The only concern is the notification mechanism of the metadata store.
It may be a hidden issue that causes MAX-POSITION to get stuck. Perhaps a 
timeout mechanism needs to be introduced on the transaction participant side as 
well.

On 2026/05/06 00:27:33 Matteo Merli wrote:
> https://github.com/apache/pulsar/pull/25693
> 
> ------
> 
> 
> # PIP-471: Metadata-Driven Transactions for Scalable Topics
> 
> *Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)*
> 
> ## Background
> 
> ### Pulsar's existing transaction model
> 
> Pulsar transactions today are realized through three components:
> 
> - **Transaction Coordinator (TC)** — a per-broker service backed by a
> system topic (`__transaction_log_*` in `pulsar/system`) that tracks
> the lifecycle of every transaction (`OPEN`, `COMMITTING`, `COMMITTED`,
> `ABORTING`, `ABORTED`, `TIME_OUT`) and orchestrates two-phase commit
> across the topics that participate in each transaction.
> - **TransactionBuffer (TB)** — a per-`PersistentTopic` component that
> buffers transactional writes in the topic's data stream, tracks
> aborted transaction IDs, and gates the dispatcher's read horizon
> (`maxReadPosition`) so that uncommitted entries are not delivered. The
> TB persists its state in a per-namespace system topic
> (`__transaction_buffer_snapshot`).
> - **PendingAckStore** — a per-(topic, subscription) component that
> records transactional acknowledgments in a sibling persistent topic
> (`<topic>-<sub>__transaction_pending_ack`), applying them to the
> cursor only when the transaction commits.
> 
> When a transaction ends, the TC sends `END_TXN_ON_PARTITION` (and
> `END_TXN_ON_SUBSCRIPTION` for acks) to every participant. The TB then
> writes a **commit or abort marker** as a regular entry in the topic's
> managed ledger. The dispatcher discovers committed/aborted state by
> replaying these markers and consulting the in-memory aborted-txn set.
> 
> ### Scalable topics
> 
> [PIP-460](pip-460.md) introduces scalable topics: a logical topic
> backed by a DAG of range segments (`segment://...`) that can be split
> or merged at runtime. Each segment is a regular `PersistentTopic` from
> the broker's perspective, but the segment's lifetime is controlled by
> the [scalable topic controller](pip-468.md) — segments get **sealed**
> when split or merged, after which the segment's managed ledger no
> longer accepts writes.
> 
> ### How the two interact
> 
> The current transaction implementation composes per-`PersistentTopic`.
> With scalable topics, every segment carries its own TB. This
> composition fails in two ways:
> 
> 1. **End-of-transaction stalls on sealed segments.** The TC sends
> `END_TXN_ON_PARTITION` to each segment that received writes. The
> segment's TB tries to append a commit/abort marker — which is a write
> — and the now-sealed segment rejects it. The end-txn RPC times out
> (~30s).
> 2. **Pending-ack topic naming collides with the segment-domain
> parser.** The convention `<topic>-<sub>__transaction_pending_ack` is
> unparseable when `<topic>` is a `segment://...` URI. (Worked around in
> #25631 with a flat persistent name; see "Out of Scope" below.)
> 
> The first issue is structural, not just a routing bug. As long as
> commit/abort decisions need to be persisted **inside the topic's data
> stream**, sealing the topic terminates any in-flight transaction.
> 
> ---
> 
> ## Motivation
> 
> We need transactions that:
> 
> 1. Provide atomicity across multiple writes and acknowledgments,
> possibly spanning multiple topics across multiple namespaces.
> 2. Compose correctly with the scalable-topic lifecycle — including
> splits, merges, and segments sealed mid-transaction.
> 3. Do not require duplicating data (each `producer.send` produces a
> single managed-ledger append).
> 4. Reuse as much of the existing transaction surface as possible —
> interfaces, dispatcher integration, client API — so that we are not
> re-litigating well-understood concerns.
> 5. Coexist with v4 transactions on `persistent://` topics with no
> behavior change for those topics.
> 
> The structural mismatch between in-stream markers and a mutable
> segment DAG cannot be papered over at the routing or the topic-naming
> layer. It needs a transaction representation that does not put the
> decision record inside the data stream.
> 
> ---
> 
> ## Goals
> 
> ### In Scope
> 
> - Atomic transactions over `segment://` topics (writes and acks),
> including transactions whose lifetime spans split/merge.
> - Multi-topic, multi-namespace, multi-segment transactions with the
> same atomicity guarantees as today.
> - Reuse of the existing `Transaction`, `TransactionCoordinator`,
> `TransactionBuffer`, `PendingAckStore`, dispatcher, and client APIs.
> New behavior arrives as alternative implementations behind the
> existing interfaces.
> - Coexistence with the legacy in-stream-marker implementation for
> `persistent://` topics.
> 
> ### Out of Scope
> 
> - Replacing the legacy implementation for non-scalable topics. The new
> implementation is opt-in per topic; `persistent://` topics keep their
> current behavior, including the existing TC.
> - Replacing the segment-aware pending-ack topic name introduced in
> #25631 — that workaround becomes unnecessary as a side effect of this
> PIP and is removed in the same change.
> - Cross-cluster (geo-replicated) transactional semantics.
> 
> ---
> 
> ## High Level Design
> 
> The proposal is one sentence:
> 
> > **Move transactional state out of the data stream and into the metadata 
> > store.**
> 
> Concretely: keep all existing components and interfaces, and add a
> parallel implementation of `TransactionBuffer`, `PendingAckStore`,
> **and Transaction Coordinator** that writes nothing to any data
> stream. Their state lives entirely in the metadata store. The legacy
> in-stream-marker components remain, unchanged, for `persistent://`
> topics; the new metadata-driven components handle `segment://` topics.
> The dispatcher's contract is unchanged.
> 
> Why introduce a v5 TC rather than reuse the legacy one: the legacy TC
> stores its log in a system topic (`__transaction_log_*`), which
> carries the operational concerns of any system topic — compaction can
> lead to long recovery times, leadership has to be maintained, and
> recovery is on the data path. With the metadata store available we can
> have a TC whose state is just a few key-value records, no log, no
> system topic, no per-broker in-memory replay. Running both TC
> implementations in parallel keeps v4 transactions byte-for-byte
> unchanged while the v5 path uses the simpler design.
> 
> ### Why this works for scalable topics
> 
> - **Sealing a segment is irrelevant.** Commit/abort no longer require
> any append to the segment. End-txn becomes a metadata-store CAS on a
> single record. Sealed segments materialize the decision (advance
> cursors, evict cache entries) without writing anything.
> - **The dispatcher does not change.** It already asks the topic's TB
> for `maxReadPosition` and `isTxnAborted`. We swap the source.
> - **Splits/merges do not strand transactions.** Sealed parents and
> live children both consult the same metadata; the decision lives above
> the segments.
> - **No data is duplicated.** Each transactional `send` produces
> exactly one managed-ledger append, same as today.
> 
> ### Architecture Overview
> 
> ```
> ┌──────────────────────────────────────────────────────────────────┐
> │   Client (V5)  -- producer.send(txn,...)                          │
> │                -- consumer.acknowledge(id, txn)                   │
> └─────────────────────────────────┬─────────────────────────────────┘
>                                   │
>             ┌─────────────────────┴─────────────────────┐
>             │                                            │
> ┌───────────▼────────────────┐         ┌────────────────▼──────────────┐
> │   Transaction Coordinator   │         │   Transaction Coordinator V5   │
> │   (legacy, BK-backed log)   │         │   (metadata-store records)     │
> │   → v4 / persistent:// txns │         │   → v5 / segment:// txns       │
> └───────────┬────────────────┘         └────────────────┬──────────────┘
>             │                                            │
>             │ END_TXN_ON_PARTITION / SUBSCRIPTION        │
>             ▼                                            ▼
> ┌─────────────────────────────────────────────────────────────────────┐
> │   Per-topic broker components                                        │
> │                                                                      │
> │   ┌──────────────────────────┐  ┌────────────────────────┐          │
> │   │ TopicTransactionBuffer    │  │ MLPendingAckStore      │          │
> │   │ (in-stream markers)       │  │ (sibling topic)        │          │
> │   │  → persistent:// topics   │  │  → persistent:// topics│          │
> │   └──────────────────────────┘  └────────────────────────┘          │
> │                                                                      │
> │   ┌──────────────────────────┐  ┌────────────────────────┐          │
> │   │ MetadataTransactionBuffer │  │ MetadataPendingAckStore│          │
> │   │ (metadata-store records)  │  │ (metadata-store records)          │
> │   │  → segment:// topics      │  │  → segment:// topics   │          │
> │   └────────┬─────────────────┘  └─────────┬──────────────┘          │
> └────────────┼──────────────────────────────┼─────────────────────────┘
>              │                              │
>              ▼                              ▼
>            Metadata Store — txn coordinator state + txn-op records +
> secondary indexes
> ```
> 
> The `TransactionBufferProvider` and
> `TransactionPendingAckStoreProvider` SPIs already exist. The new TB /
> PendingAckStore implementations slot in behind them. The v5 TC is a
> parallel coordinator selected by the client when it is configured for
> the new path. Selection on the participant side is per-topic, based on
> the topic's domain.
> 
> ---
> 
> ## Detailed Design
> 
> ### Data Model
> 
> The metadata store holds two classes of records and four secondary
> indexes. All records for a given transaction share the same
> **partition key** (`txnId`) so they are co-located — this makes
> per-txn scans (e.g. listing all ops to apply at end-txn time) a
> single-partition operation rather than a fan-out.
> 
> > **A note on metadata-store backends.** The design is 
> > `MetadataStore`-agnostic. It depends on three capabilities — partition-key 
> > co-location, sequential keys, and secondary indexes with range queries and 
> > range-watch — that the `MetadataStore` interface does not expose today. We 
> > extend the interface to surface them; backends that natively support these 
> > (notably Oxia, the intended default) implement them directly, while 
> > backends that don't (e.g. ZooKeeper) can implement them in a less efficient 
> > way (client-side counters for sequential IDs; client-maintained index 
> > records; periodic re-list in lieu of range-watch). Correctness does not 
> > depend on backend choice; throughput and recovery latency may.
> 
> #### Header — one per transaction. Linearization point.
> 
> ```
> /txn/<txnId>                        partitionKey = txnId
>   =  {
>        state:       OPEN | COMMITTED | ABORTED,
>        timeout_ms:  <abs epoch ms>,
>        created_ms:  <abs epoch ms>
>      }
> ```
> 
> State transitions are conditional puts (CAS on version) issued by the
> v5 TC. `OPEN → COMMITTED` and `OPEN → ABORTED` are the only allowed
> transitions; `COMMITTED` and `ABORTED` are terminal.
> 
> #### Operation records — one per transactional write or ack. Unbounded.
> 
> ```
> /txn-op/<txnId>/<seq>               partitionKey = txnId,
>                                     sequential   = true     #
> server-assigned <seq>
>   =  {
>        kind:         "write" | "ack",
>        segment:      "segment://t/n/x/<descriptor>",  # always present
>        subscription: "<sub-fqn>",                     # ack only
>        position:     <ledgerId>:<entryId>
>      }
> ```
> 
> Each operation is its own record, so a transaction has no size limit
> and concurrent participants do not contend on a single record. With
> **sequential keys** the server (or, on backends that lack them, a
> `MetadataStore`-side counter) assigns `<seq>`, eliminating client-side
> collisions.
> 
> #### Secondary indexes (auto-maintained by the metadata store)
> 
> ```
> idx:writes-by-segment              on /txn-op/* where kind=write
>                                    key = segment
>                                    →  range query "writes touching segment S"
> 
> idx:acks-by-segment-subscription   on /txn-op/* where kind=ack
>                                    key = (segment, subscription)
>                                    →  range query "acks on (segment S,
> subscription SU)"
> 
> idx:txn-by-deadline                on /txn/* where state=OPEN
>                                    key = timeout_ms
>                                    →  range query "open txns past deadline"
>                                    →  used by TC for timeout-driven abort
> 
> idx:txn-by-final-state             on /txn/* where state ∈ {COMMITTED, 
> ABORTED}
>                                    key = (state, finalized_ms)
>                                    →  range query "finalized txns ready for 
> GC"
>                                    →  used by GC sweep to find
> finalized txns whose op records can be deleted
> ```
> 
> #### Garbage collection
> 
> A finalized transaction (`COMMITTED` or `ABORTED`) is removed in two phases:
> 
> 1. **Per-participant materialization.** When the TC fans out end-txn,
> each participant broker materializes the decision (commit: advance
> subscription cursors for acks, evict header cache; abort: drop ops).
> Once a participant has finished its materialization for `<txnId>`, it
> deletes its op records (`/txn-op/<txnId>/<seq>` for ops it owns).
> 2. **Header GC sweep.** A periodic sweep scans
> `idx:txn-by-final-state` for entries past a configurable retention
> window (e.g. 60 s after `finalized_ms`). For each, it verifies no
> `/txn-op/<txnId>/*` records remain (orphan check from a participant
> crash), forces deletion of any leftovers, and finally deletes the
> header `/txn/<txnId>`.
> 
> Because all of a txn's records share the same partition (`partitionKey
> = txnId`), the GC sweep's per-txn cleanup stays in one partition: list
> `/txn-op/<txnId>/`, delete, then delete the header.
> 
> Indexes update transactionally with the underlying records, so they 
> self-clean.
> 
> ### Components
> 
> #### `MetadataTransactionBuffer` (new)
> 
> Implements the existing `TransactionBuffer` interface. Used for
> `segment://` topics.
> 
> | Method | Behavior |
> |---|---|
> | `appendBufferToTxn(txnId, buf)` | `ML.asyncAddEntry(buf)`; on
> success, append a sequential `/txn-op/<txnId>/<seq>`
> (`partitionKey=txnId`) with `kind="write", segment, position`. The
> publish ack waits for both. |
> | `commit(txnId, position)` / `abort(...)` | Not invoked by the v5 TC
> (which does not RPC participants). The TB's header watch fires when
> `/txn/<txnId>.state` changes; the TB then materializes locally (evict
> / mark-aborted) and deletes its owned op records. |
> | `getMaxReadPosition()` | Read from in-memory cache. Cache is
> populated by a watch on `idx:writes-by-segment == <my-segment>` joined
> against the header cache. Result: `min(position over OPEN txns) - 1`,
> capped at LAC. |
> | `isTxnAborted(msg)` | Look up `/txn/<txnId>.state` from header cache. |
> | `recover()` | Open the index watch and the header cache; populate
> from the current snapshot. No log replay, no snapshot topic. |
> 
> #### `MetadataPendingAckStore` (new)
> 
> Implements the existing `PendingAckStore` interface. Used for
> `segment://` topic subscriptions.
> 
> | Method | Behavior |
> |---|---|
> | `appendIndividualAck(txnId, positions)` | Append sequential
> `/txn-op/<txnId>/<seq>` records with `kind="ack", segment,
> subscription, position`. |
> | `appendCumulativeAck(...)` | Same shape, single op record carrying
> the cumulative position. |
> | `commit(txnId)` / `abort(txnId)` | Not invoked by the v5 TC.
> Triggered locally when the header watch on `/txn/<txnId>.state` fires.
> Commit: range-query `idx:acks-by-segment-subscription ==
> (<my-segment>, <my-subscription>)` filtered to `<txnId>`; apply to
> cursor (`markDelete` or `individualAck`); range-delete the op records.
> Abort: range-delete the op records, no cursor work. |
> | `replayAsync()` (recovery) | Range-query
> `idx:acks-by-segment-subscription == (<my-segment>,
> <my-subscription>)`, group by `txnId`, hydrate in-memory state. |
> 
> #### Transaction Coordinator V5 (new)
> 
> A parallel coordinator selected by the v5 client. Same client-facing
> wire commands (`NEW_TXN`, `ADD_PARTITION_TO_TXN`,
> `ADD_SUBSCRIPTION_TO_TXN`, `END_TXN`), but no system-topic log: every
> operation reads or CAS's a metadata-store record. **The TC does not
> RPC participants** — see "Notification mechanism" below.
> 
> | Operation | Behavior |
> |---|---|
> | `newTxn(timeoutMs)` | Create `/txn/<txnId>` with `state=OPEN`,
> `timeout_ms=now+timeoutMs`. |
> | `addPartitionToTxn` / `addSubscriptionToTxn` | No-op at the
> coordinator. The participant broker writes its own op records when the
> actual write/ack arrives; the TC never needs to enumerate
> participants. |
> | `endTxn(COMMIT\|ABORT)` | A single CAS on `/txn/<txnId>.state`.
> After it returns, the TC sets `finalized_ms` on the header and acks
> the client. No fan-out, no waiting on participants. |
> | Timeout sweep | Range-query `idx:txn-by-deadline` for entries with
> `timeout_ms ≤ now`, abort each (same single-CAS flow). |
> | GC sweep | Range-query `idx:txn-by-final-state` for entries past
> retention; for each, verify `/txn-op/<txnId>/*` is empty (force-delete
> leftovers); delete header. |
> 
> Why parallel rather than reusing the legacy TC: the legacy TC's
> per-shard system topic (`__transaction_log_*`) requires leadership
> election, runs compaction over its own log, and pays a recovery cost
> on every broker restart proportional to the live transaction set. The
> v5 TC's state is just per-txn KV records — there is no log to compact
> and no cold-start replay. Running both in parallel keeps v4
> transactions byte-for-byte unchanged while the v5 path uses the
> simpler design. A v5 client routes its `NEW_TXN` to the v5 TC; v4
> clients route to the legacy TC. A single transaction does not span the
> two.
> 
> #### Notification mechanism (TC → participants)
> 
> The legacy TC needs to RPC each participant (`END_TXN_ON_PARTITION`,
> `END_TXN_ON_SUBSCRIPTION`) because the participants have no other way
> to learn the decision — the TC's log is the only source of truth, and
> only the TC reads it.
> 
> In the v5 design **the metadata store is the source of truth**, and
> every participant already reads from it. Participants therefore learn
> about state transitions directly from the store, without any
> TC-to-broker RPC:
> 
> - A `MetadataTransactionBuffer` keeps an in-memory header cache for
> txns it has writes from. The cache entries are populated when a write
> op record is appended (the broker reads the header to authorize the
> write) and **kept up to date by point-watches on the headers it has
> cached**.
> - A `MetadataPendingAckStore` maintains the same pattern for txns it
> has acks from.
> - When the TC CAS's `/txn/<txnId>.state` from OPEN →
> COMMITTED/ABORTED, every cached watcher fires. Each participant
> materializes locally:
>   - **Commit** — TB evicts its cache entry (the txn no longer pins
> `maxReadPosition` back); PendingAckStore applies the buffered acks to
> the cursor.
>   - **Abort** — TB marks the txn aborted in its cache (the
> dispatcher's `isTxnAborted` will skip those entries); PendingAckStore
> drops the buffered acks.
> - After materialization, the participant deletes the op records it
> owns (`/txn-op/<txnId>/<seq>` for ops on its segment / subscription).
> - The TC's GC sweep (above) detects when all participants have done
> their cleanup — the prefix `/txn-op/<txnId>/*` is empty — and deletes
> the header.
> 
> Consequences:
> 
> - **End-txn latency.** From the client's perspective, `commit` returns
> as soon as the header CAS lands. From a consumer's perspective,
> freshly-committed entries become visible after the participant's
> header watch fires + materialization runs. That's typically tens of
> milliseconds; bounded by metadata-store watch propagation. (If we ever
> care about a tighter bound — e.g. for a given workload — the TC can
> issue an optional `nudge` RPC to participants in parallel with the
> CAS. Not needed for correctness; not in this PIP.)
> - **No RPC fan-out from TC.** End-txn is `O(1)` work at the TC: one
> CAS. The cost of fan-out is paid by the metadata store's
> watch-delivery infrastructure, which already exists for other Pulsar
> uses.
> - **Crash idempotence.** A participant that crashes during
> materialization restarts, observes the (already-final) header state
> via its watch, and finishes materialization. The TC need not retry
> anything.
> 
> #### Dispatcher
> 
> Unchanged. It already asks
> `topic.getTransactionBuffer().getMaxReadPosition()` and
> `topic.getTransactionBuffer().isTxnAborted(...)`. The new TB
> implements both.
> 
> ### Flows
> 
> #### Publish (transactional)
> 
> ```mermaid
> sequenceDiagram
>     participant C as Client
>     participant B as Segment broker
>     participant ML as Managed Ledger
>     participant M as Metadata Store
> 
>     C->>B: send(txnId, payload)
>     B->>M: read /txn/<txnId>.state  (cached)
>     alt state != OPEN
>         B-->>C: TxnConflict
>     else state == OPEN
>         B->>ML: asyncAddEntry(payload)
>         ML-->>B: position
>         B->>M: put /txn-op/<txnId>/<seq> {kind=write, segment, position}
>         M-->>B: ack
>         B-->>C: send-ack
>     end
> ```
> 
> The header read is cache-first; the cache is invalidated by the same
> watch the TB already maintains on the header. The op-record put is the
> only synchronous metadata-store write on the publish path.
> 
> #### End-txn (commit or abort)
> 
> ```mermaid
> sequenceDiagram
>     participant Cl as Client
>     participant TC as Transaction Coordinator V5
>     participant M as Metadata Store
>     participant P as Participant brokers
> 
>     Cl->>TC: commit(txnId)
>     TC->>M: CAS /txn/<txnId>.state OPEN→COMMITTED, set finalized_ms
>     M-->>TC: ack
>     TC-->>Cl: ack
> 
>     Note over M,P: Independently, asynchronously:
>     M-->>P: header watch fires
>     P->>P: materialize (cursor advance / cache evict)
>     P->>M: delete owned /txn-op/<txnId>/<seq> records
> 
>     Note over TC,M: Later, GC sweep:
>     TC->>M: list /txn-op/<txnId>/*  (empty? then delete header)
> ```
> 
> The CAS on the header is the linearization point — that is when the
> transaction's outcome is decided. Notification of participants is not
> part of the linearization; it propagates via the watches every
> participant already maintains on the headers it has cached. Sealed
> segments are fine — materialization is metadata + cursor work, no
> managed-ledger writes.
> 
> #### Subscribe / dispatch
> 
> Unchanged. The dispatcher polls `tb.getMaxReadPosition()` and filters
> by `tb.isTxnAborted(msg)`. The `MetadataTransactionBuffer` answers
> both from its in-memory caches, fed by metadata-store watches.
> 
> #### Late-write race
> 
> The TC is mid-end-txn when the client publishes once more inside the
> same transaction. The header CAS may have already flipped to
> `COMMITTED`/`ABORTED`. The publish-path header check on the
> participant broker rejects with `TxnConflict`. This mirrors today's TC
> behavior (the TC marks transactions as ENDING and brokers reject new
> writes); the only difference is that the rejection criterion is now
> read from the metadata store rather than from a TC RPC.
> 
> ### Recovery
> 
> - **Broker startup.** Each `MetadataTransactionBuffer` opens its index
> watch and header cache. The first watch event delivers the snapshot;
> the TB is ready as soon as the snapshot has been applied. No log
> replay, no system-topic reader, no snapshot topic.
> - **Broker crash mid-publish.** If the broker appended the entry but
> crashed before writing the op record, the entry exists in the segment
> but no metadata claims it. On txn timeout the TC aborts the txn; the
> dispatcher's `isTxnAborted` check (which falls back to "abort" for
> unknown txnIds at retention horizon) discards the entry.
> - **Broker crash mid-end-txn.** If the header CAS landed but
> materialization on a participant did not complete, the participant
> re-derives state from the header on restart and finishes
> materialization. End-txn is idempotent.
> - **TC failover.** The v5 TC has no in-memory log to replay — its
> state lives in the metadata store. Whichever broker takes over
> coordinator duty for a TC partition resumes operations directly from
> the metadata-store records. Cold-start cost is bounded by
> `idx:txn-by-deadline` and `idx:txn-by-final-state` scans, not by
> replay of an entire transaction log.
> 
> ### Concurrency and contention
> 
> - Each transactional publish writes a unique `/txn-op/<txnId>/<seq>`
> record (server-assigned sequential key). There is no contention
> between concurrent participants of the same transaction.
> - The header is CAS'd at most twice per transaction lifetime (open +
> finalize), so contention there is bounded.
> - All records for a given txn share `partitionKey=txnId`, so per-txn
> operations (list, range-delete) stay on a single partition.
> - Index updates are managed by the metadata store; their scaling is
> the store's concern.
> 
> ---
> 
> ## Public-facing Changes
> 
> ### Public API
> 
> No changes. The client-facing `Transaction` API is unchanged.
> 
> ### Binary protocol
> 
> No changes to client-facing wire commands (`NEW_TXN`,
> `ADD_PARTITION_TO_TXN`, `ADD_SUBSCRIPTION_TO_TXN`, `END_TXN`) — the v5
> TC accepts them with the same semantics as the legacy TC.
> 
> The broker-to-broker commands `END_TXN_ON_PARTITION` and
> `END_TXN_ON_SUBSCRIPTION` are **not used** by the v5 path: participant
> brokers learn about the decision by watching the metadata-store header
> rather than by receiving an RPC from the TC. The legacy TC still uses
> these commands for v4 transactions; they remain in the protocol
> unchanged.
> 
> ### Configuration
> 
> A per-namespace or per-broker setting selects the TB implementation.
> Default for `segment://` topics: metadata-driven. Default for
> `persistent://` topics: in-stream markers (unchanged). Override is
> possible per-namespace for debugging / migration.
> 
> ### Metrics
> 
> Existing transaction metrics remain. The metadata-driven implementation adds:
> 
> - `pulsar_txn_metadata_store_op_writes_total` (counter) — op records written.
> - `pulsar_txn_metadata_store_header_cas_total{result="ok|conflict|reject"}`
> (counter) — header CAS attempts and outcomes.
> - `pulsar_txn_metadata_store_index_query_seconds` (histogram) —
> latency of the index range queries on `idx:writes-by-segment` /
> `idx:acks-by-segment-subscription`.
> - `pulsar_txn_metadata_store_outstanding_op_records` (gauge) —
> uncollected op records (a proxy for txn GC backlog).
> 
> Existing `pulsar_txn_tb_*` snapshot/replay metrics are not emitted by
> the new implementation (no snapshots, no replay).
> 
> ---
> 
> ## Backward & Forward Compatibility
> 
> ### Upgrade
> 
> - Existing `persistent://` topic behavior is unchanged. v4 clients see
> no difference.
> - Brokers running this PIP can interoperate with brokers that do not,
> as long as a given **topic** is consistently served by brokers of one
> kind. Since topic ownership is bundle-based and migration via
> load-balancer transfers TB state across, this is satisfied
> automatically.
> - Per-segment pending-ack topics created by the workaround in #25631
> (`persistent://t/n/<localName>-<descriptor>-<sub>__transaction_pending_ack`)
> are no longer used. They are deleted as part of upgrade. Since the
> workaround was only ever exercised by V5 transactional consumer flows,
> the upgrade path is safe.
> 
> ### Downgrade / Rollback
> 
> Not applicable. Scalable topics are introduced as a new feature in
> Pulsar 5.0 ([PIP-460](pip-460.md)); this PIP defines transactional
> support for that feature from the start. There is no prior version to
> roll back to.
> 
> ### Pulsar Geo-Replication
> 
> Out of scope. Transactional geo-replication is not supported in either model.
> 
> ---
> 
> ## Alternatives Considered
> 
> ### A. Move TB to the scalable-topic level (one TB per logical topic)
> 
> Earlier draft of this design. Architecturally clean — decisions live
> above segments — but introduces a new broker-side singleton per
> scalable topic, adds new failover semantics, and complicates the TC's
> wire protocol (end-txn would need redirection from segment to
> scalable-topic owner). Replacing the per-topic TB **implementation**
> with a metadata-driven one achieves the same correctness without any
> of that surface area.
> 
> ### B. Per-segment TB but using an off-segment marker stream
> 
> Keep the per-segment TB; have it write commit/abort markers to a
> **separate** managed ledger (e.g. a shadow topic) rather than into the
> segment's own data. Sealed segments would no longer block end-txn.
> Rejected because: (1) it doubles the data path (every txn needs a
> write to the segment **and** to the shadow topic), (2) it requires a
> new system-topic-per-segment, and (3) it does not eliminate the
> snapshot/replay machinery that the metadata-driven approach removes
> outright.
> 
> ### C. Skip transactional support on scalable topics
> 
> Document scalable topics as non-transactional. Rejected: the
> transactional consume-and-produce pattern is a primary use case for
> scalable streaming workloads (Kafka Streams analogue), and PIP-460's
> roadmap explicitly calls out transactions across range segments as a
> Phase 4 deliverable.
> 
> ---
> 
> ## General Notes
> 
> The shape of the change is *one new `TransactionBuffer`
> implementation, one new `PendingAckStore` implementation, one new
> `TransactionCoordinator` implementation, and the `MetadataStore`
> extensions to support partition-key co-location, sequential keys, and
> secondary indexes with range-watch*. The complexity is in the
> interaction of the metadata schema with the dispatcher's existing
> assumptions, not in any new system component on the broker.
> 
> ## Links
> 
> - [PIP-460: Scalable Topics](pip-460.md)
> - [PIP-468: Scalable Topic Controller](pip-468.md)
> 
> 
> 
> --
> Matteo Merli
> <[email protected]>
> 
> 

Reply via email to