lhotari commented on code in PR #25693:
URL: https://github.com/apache/pulsar/pull/25693#discussion_r3196624939


##########
pip/pip-473.md:
##########
@@ -0,0 +1,401 @@
+# PIP-471: Metadata-Driven Transactions for Scalable Topics
+
+*Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)*
+
+## Background
+
+### Pulsar's existing transaction model
+
+Pulsar transactions today are realized through three components:
+
+- **Transaction Coordinator (TC)** — a per-broker service backed by a system 
topic (`__transaction_log_*` in `pulsar/system`) that tracks the lifecycle of 
every transaction (`OPEN`, `COMMITTING`, `COMMITTED`, `ABORTING`, `ABORTED`, 
`TIME_OUT`) and orchestrates two-phase commit across the topics that 
participate in each transaction.
+- **TransactionBuffer (TB)** — a per-`PersistentTopic` component that buffers 
transactional writes in the topic's data stream, tracks aborted transaction 
IDs, and gates the dispatcher's read horizon (`maxReadPosition`) so that 
uncommitted entries are not delivered. The TB persists its state in a 
per-namespace system topic (`__transaction_buffer_snapshot`).
+- **PendingAckStore** — a per-(topic, subscription) component that records 
transactional acknowledgments in a sibling persistent topic 
(`<topic>-<sub>__transaction_pending_ack`), applying them to the cursor only 
when the transaction commits.
+
+When a transaction ends, the TC sends `END_TXN_ON_PARTITION` (and 
`END_TXN_ON_SUBSCRIPTION` for acks) to every participant. The TB then writes a 
**commit or abort marker** as a regular entry in the topic's managed ledger. 
The dispatcher discovers committed/aborted state by replaying these markers and 
consulting the in-memory aborted-txn set.

Review Comment:
   regular entry -> marker entry



##########
pip/pip-473.md:
##########
@@ -0,0 +1,401 @@
+# PIP-471: Metadata-Driven Transactions for Scalable Topics
+
+*Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)*
+
+## Background
+
+### Pulsar's existing transaction model
+
+Pulsar transactions today are realized through three components:
+
+- **Transaction Coordinator (TC)** — a per-broker service backed by a system 
topic (`__transaction_log_*` in `pulsar/system`) that tracks the lifecycle of 
every transaction (`OPEN`, `COMMITTING`, `COMMITTED`, `ABORTING`, `ABORTED`, 
`TIME_OUT`) and orchestrates two-phase commit across the topics that 
participate in each transaction.
+- **TransactionBuffer (TB)** — a per-`PersistentTopic` component that buffers 
transactional writes in the topic's data stream, tracks aborted transaction 
IDs, and gates the dispatcher's read horizon (`maxReadPosition`) so that 
uncommitted entries are not delivered. The TB persists its state in a 
per-namespace system topic (`__transaction_buffer_snapshot`).
+- **PendingAckStore** — a per-(topic, subscription) component that records 
transactional acknowledgments in a sibling persistent topic 
(`<topic>-<sub>__transaction_pending_ack`), applying them to the cursor only 
when the transaction commits.
+
+When a transaction ends, the TC sends `END_TXN_ON_PARTITION` (and 
`END_TXN_ON_SUBSCRIPTION` for acks) to every participant. The TB then writes a 
**commit or abort marker** as a regular entry in the topic's managed ledger. 
The dispatcher discovers committed/aborted state by replaying these markers and 
consulting the in-memory aborted-txn set.
+
+### Scalable topics
+
+[PIP-460](pip-460.md) introduces scalable topics: a logical topic backed by a 
DAG of range segments (`segment://...`) that can be split or merged at runtime. 
Each segment is a regular `PersistentTopic` from the broker's perspective, but 
the segment's lifetime is controlled by the [scalable topic 
controller](pip-468.md) — segments get **sealed** when split or merged, after 
which the segment's managed ledger no longer accepts writes.
+
+### How the two interact
+
+The current transaction implementation composes per-`PersistentTopic`. With 
scalable topics, every segment carries its own TB. This composition fails in 
two ways:
+
+1. **End-of-transaction stalls on sealed segments.** The TC sends 
`END_TXN_ON_PARTITION` to each segment that received writes. The segment's TB 
tries to append a commit/abort marker — which is a write — and the now-sealed 
segment rejects it. The end-txn RPC times out (~30s).
+2. **Pending-ack topic naming collides with the segment-domain parser.** The 
convention `<topic>-<sub>__transaction_pending_ack` is unparseable when 
`<topic>` is a `segment://...` URI. (Worked around in #25631 with a flat 
persistent name; see "Out of Scope" below.)
+
+The first issue is structural, not just a routing bug. As long as commit/abort 
decisions need to be persisted **inside the topic's data stream**, sealing the 
topic terminates any in-flight transaction.
+
+---
+
+## Motivation
+
+We need transactions that:
+
+1. Provide atomicity across multiple writes and acknowledgments, possibly 
spanning multiple topics across multiple namespaces.
+2. Compose correctly with the scalable-topic lifecycle — including splits, 
merges, and segments sealed mid-transaction.
+3. Do not require duplicating data (each `producer.send` produces a single 
managed-ledger append).
+4. Reuse as much of the existing transaction surface as possible — interfaces, 
dispatcher integration, client API — so that we are not re-litigating 
well-understood concerns.
+5. Coexist with v4 transactions on `persistent://` topics with no behavior 
change for those topics.

Review Comment:
   Does this mean a transaction ID for a v4 or v5 transaction can be used 
interchangeably between the two clients?



##########
pip/pip-473.md:
##########
@@ -0,0 +1,401 @@
+# PIP-471: Metadata-Driven Transactions for Scalable Topics
+
+*Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)*
+
+## Background
+
+### Pulsar's existing transaction model
+
+Pulsar transactions today are realized through three components:
+
+- **Transaction Coordinator (TC)** — a per-broker service backed by a system 
topic (`__transaction_log_*` in `pulsar/system`) that tracks the lifecycle of 
every transaction (`OPEN`, `COMMITTING`, `COMMITTED`, `ABORTING`, `ABORTED`, 
`TIME_OUT`) and orchestrates two-phase commit across the topics that 
participate in each transaction.
+- **TransactionBuffer (TB)** — a per-`PersistentTopic` component that buffers 
transactional writes in the topic's data stream, tracks aborted transaction 
IDs, and gates the dispatcher's read horizon (`maxReadPosition`) so that 
uncommitted entries are not delivered. The TB persists its state in a 
per-namespace system topic (`__transaction_buffer_snapshot`).
+- **PendingAckStore** — a per-(topic, subscription) component that records 
transactional acknowledgments in a sibling persistent topic 
(`<topic>-<sub>__transaction_pending_ack`), applying them to the cursor only 
when the transaction commits.
+
+When a transaction ends, the TC sends `END_TXN_ON_PARTITION` (and 
`END_TXN_ON_SUBSCRIPTION` for acks) to every participant. The TB then writes a 
**commit or abort marker** as a regular entry in the topic's managed ledger. 
The dispatcher discovers committed/aborted state by replaying these markers and 
consulting the in-memory aborted-txn set.
+
+### Scalable topics
+
+[PIP-460](pip-460.md) introduces scalable topics: a logical topic backed by a 
DAG of range segments (`segment://...`) that can be split or merged at runtime. 
Each segment is a regular `PersistentTopic` from the broker's perspective, but 
the segment's lifetime is controlled by the [scalable topic 
controller](pip-468.md) — segments get **sealed** when split or merged, after 
which the segment's managed ledger no longer accepts writes.
+
+### How the two interact
+
+The current transaction implementation composes per-`PersistentTopic`. With 
scalable topics, every segment carries its own TB. This composition fails in 
two ways:
+
+1. **End-of-transaction stalls on sealed segments.** The TC sends 
`END_TXN_ON_PARTITION` to each segment that received writes. The segment's TB 
tries to append a commit/abort marker — which is a write — and the now-sealed 
segment rejects it. The end-txn RPC times out (~30s).
+2. **Pending-ack topic naming collides with the segment-domain parser.** The 
convention `<topic>-<sub>__transaction_pending_ack` is unparseable when 
`<topic>` is a `segment://...` URI. (Worked around in #25631 with a flat 
persistent name; see "Out of Scope" below.)
+
+The first issue is structural, not just a routing bug. As long as commit/abort 
decisions need to be persisted **inside the topic's data stream**, sealing the 
topic terminates any in-flight transaction.
+
+---
+
+## Motivation
+
+We need transactions that:
+
+1. Provide atomicity across multiple writes and acknowledgments, possibly 
spanning multiple topics across multiple namespaces.
+2. Compose correctly with the scalable-topic lifecycle — including splits, 
merges, and segments sealed mid-transaction.
+3. Do not require duplicating data (each `producer.send` produces a single 
managed-ledger append).
+4. Reuse as much of the existing transaction surface as possible — interfaces, 
dispatcher integration, client API — so that we are not re-litigating 
well-understood concerns.
+5. Coexist with v4 transactions on `persistent://` topics with no behavior 
change for those topics.
+
+The structural mismatch between in-stream markers and a mutable segment DAG 
cannot be papered over at the routing or the topic-naming layer. It needs a 
transaction representation that does not put the decision record inside the 
data stream.
+
+---
+
+## Goals
+
+### In Scope
+
+- Atomic transactions over `segment://` topics (writes and acks), including 
transactions whose lifetime spans split/merge.
+- Multi-topic, multi-namespace, multi-segment transactions with the same 
atomicity guarantees as today.
+- Reuse of the existing `Transaction`, `TransactionCoordinator`, 
`TransactionBuffer`, `PendingAckStore`, dispatcher, and client APIs. New 
behavior arrives as alternative implementations behind the existing interfaces.
+- Coexistence with the legacy in-stream-marker implementation for 
`persistent://` topics.
+
+### Out of Scope
+
+- Replacing the legacy implementation for non-scalable topics. The new 
implementation is opt-in per topic; `persistent://` topics keep their current 
behavior, including the existing TC.
+- Replacing the segment-aware pending-ack topic name introduced in #25631 — 
that workaround becomes unnecessary as a side effect of this PIP and is removed 
in the same change.
+- Cross-cluster (geo-replicated) transactional semantics.
+
+---
+
+## High Level Design
+
+The proposal is one sentence:
+
+> **Move transactional state out of the data stream and into the metadata 
store.**
+
+Concretely: keep all existing components and interfaces, and add a parallel 
implementation of `TransactionBuffer`, `PendingAckStore`, **and Transaction 
Coordinator** that writes nothing to any data stream. Their state lives 
entirely in the metadata store. The legacy in-stream-marker components remain, 
unchanged, for `persistent://` topics; the new metadata-driven components 
handle `segment://` topics. The dispatcher's contract is unchanged.
+
+Why introduce a v5 TC rather than reuse the legacy one: the legacy TC stores 
its log in a system topic (`__transaction_log_*`), which carries the 
operational concerns of any system topic — compaction can lead to long recovery 
times, leadership has to be maintained, and recovery is on the data path. With 
the metadata store available we can have a TC whose state is just a few 
key-value records, no log, no system topic, no per-broker in-memory replay. 
Running both TC implementations in parallel keeps v4 transactions byte-for-byte 
unchanged while the v5 path uses the simpler design.

Review Comment:
   What would be the migration solution and possible rollback solution for 
existing v4 topic data which has been written in v4 transactions format?



##########
pip/pip-473.md:
##########
@@ -0,0 +1,401 @@
+# PIP-471: Metadata-Driven Transactions for Scalable Topics
+
+*Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)*
+
+## Background
+
+### Pulsar's existing transaction model
+
+Pulsar transactions today are realized through three components:
+
+- **Transaction Coordinator (TC)** — a per-broker service backed by a system 
topic (`__transaction_log_*` in `pulsar/system`) that tracks the lifecycle of 
every transaction (`OPEN`, `COMMITTING`, `COMMITTED`, `ABORTING`, `ABORTED`, 
`TIME_OUT`) and orchestrates two-phase commit across the topics that 
participate in each transaction.
+- **TransactionBuffer (TB)** — a per-`PersistentTopic` component that buffers 
transactional writes in the topic's data stream, tracks aborted transaction 
IDs, and gates the dispatcher's read horizon (`maxReadPosition`) so that 
uncommitted entries are not delivered. The TB persists its state in a 
per-namespace system topic (`__transaction_buffer_snapshot`).
+- **PendingAckStore** — a per-(topic, subscription) component that records 
transactional acknowledgments in a sibling persistent topic 
(`<topic>-<sub>__transaction_pending_ack`), applying them to the cursor only 
when the transaction commits.
+
+When a transaction ends, the TC sends `END_TXN_ON_PARTITION` (and 
`END_TXN_ON_SUBSCRIPTION` for acks) to every participant. The TB then writes a 
**commit or abort marker** as a regular entry in the topic's managed ledger. 
The dispatcher discovers committed/aborted state by replaying these markers and 
consulting the in-memory aborted-txn set.
+
+### Scalable topics
+
+[PIP-460](pip-460.md) introduces scalable topics: a logical topic backed by a 
DAG of range segments (`segment://...`) that can be split or merged at runtime. 
Each segment is a regular `PersistentTopic` from the broker's perspective, but 
the segment's lifetime is controlled by the [scalable topic 
controller](pip-468.md) — segments get **sealed** when split or merged, after 
which the segment's managed ledger no longer accepts writes.
+
+### How the two interact
+
+The current transaction implementation composes per-`PersistentTopic`. With 
scalable topics, every segment carries its own TB. This composition fails in 
two ways:
+
+1. **End-of-transaction stalls on sealed segments.** The TC sends 
`END_TXN_ON_PARTITION` to each segment that received writes. The segment's TB 
tries to append a commit/abort marker — which is a write — and the now-sealed 
segment rejects it. The end-txn RPC times out (~30s).
+2. **Pending-ack topic naming collides with the segment-domain parser.** The 
convention `<topic>-<sub>__transaction_pending_ack` is unparseable when 
`<topic>` is a `segment://...` URI. (Worked around in #25631 with a flat 
persistent name; see "Out of Scope" below.)
+
+The first issue is structural, not just a routing bug. As long as commit/abort 
decisions need to be persisted **inside the topic's data stream**, sealing the 
topic terminates any in-flight transaction.
+
+---
+
+## Motivation
+
+We need transactions that:
+
+1. Provide atomicity across multiple writes and acknowledgments, possibly 
spanning multiple topics across multiple namespaces.
+2. Compose correctly with the scalable-topic lifecycle — including splits, 
merges, and segments sealed mid-transaction.
+3. Do not require duplicating data (each `producer.send` produces a single 
managed-ledger append).
+4. Reuse as much of the existing transaction surface as possible — interfaces, 
dispatcher integration, client API — so that we are not re-litigating 
well-understood concerns.
+5. Coexist with v4 transactions on `persistent://` topics with no behavior 
change for those topics.
+
+The structural mismatch between in-stream markers and a mutable segment DAG 
cannot be papered over at the routing or the topic-naming layer. It needs a 
transaction representation that does not put the decision record inside the 
data stream.
+
+---
+
+## Goals
+
+### In Scope
+
+- Atomic transactions over `segment://` topics (writes and acks), including 
transactions whose lifetime spans split/merge.
+- Multi-topic, multi-namespace, multi-segment transactions with the same 
atomicity guarantees as today.
+- Reuse of the existing `Transaction`, `TransactionCoordinator`, 
`TransactionBuffer`, `PendingAckStore`, dispatcher, and client APIs. New 
behavior arrives as alternative implementations behind the existing interfaces.
+- Coexistence with the legacy in-stream-marker implementation for 
`persistent://` topics.
+
+### Out of Scope
+
+- Replacing the legacy implementation for non-scalable topics. The new 
implementation is opt-in per topic; `persistent://` topics keep their current 
behavior, including the existing TC.
+- Replacing the segment-aware pending-ack topic name introduced in #25631 — 
that workaround becomes unnecessary as a side effect of this PIP and is removed 
in the same change.
+- Cross-cluster (geo-replicated) transactional semantics.
+
+---
+
+## High Level Design
+
+The proposal is one sentence:
+
+> **Move transactional state out of the data stream and into the metadata 
store.**
+
+Concretely: keep all existing components and interfaces, and add a parallel 
implementation of `TransactionBuffer`, `PendingAckStore`, **and Transaction 
Coordinator** that writes nothing to any data stream. Their state lives 
entirely in the metadata store. The legacy in-stream-marker components remain, 
unchanged, for `persistent://` topics; the new metadata-driven components 
handle `segment://` topics. The dispatcher's contract is unchanged.
+
+Why introduce a v5 TC rather than reuse the legacy one: the legacy TC stores 
its log in a system topic (`__transaction_log_*`), which carries the 
operational concerns of any system topic — compaction can lead to long recovery 
times, leadership has to be maintained, and recovery is on the data path. With 
the metadata store available we can have a TC whose state is just a few 
key-value records, no log, no system topic, no per-broker in-memory replay. 
Running both TC implementations in parallel keeps v4 transactions byte-for-byte 
unchanged while the v5 path uses the simpler design.
+
+### Why this works for scalable topics
+
+- **Sealing a segment is irrelevant.** Commit/abort no longer require any 
append to the segment. End-txn becomes a metadata-store CAS on a single record. 
Sealed segments materialize the decision (advance cursors, evict cache entries) 
without writing anything.
+- **The dispatcher does not change.** It already asks the topic's TB for 
`maxReadPosition` and `isTxnAborted`. We swap the source.
+- **Splits/merges do not strand transactions.** Sealed parents and live 
children both consult the same metadata; the decision lives above the segments.
+- **No data is duplicated.** Each transactional `send` produces exactly one 
managed-ledger append, same as today.
+
+### Architecture Overview
+
+```
+┌──────────────────────────────────────────────────────────────────┐
+│   Client (V5)  -- producer.send(txn,...)                          │
+│                -- consumer.acknowledge(id, txn)                   │
+└─────────────────────────────────┬─────────────────────────────────┘
+                                  │
+            ┌─────────────────────┴─────────────────────┐
+            │                                            │
+┌───────────▼────────────────┐         ┌────────────────▼──────────────┐
+│   Transaction Coordinator   │         │   Transaction Coordinator V5   │
+│   (legacy, BK-backed log)   │         │   (metadata-store records)     │
+│   → v4 / persistent:// txns │         │   → v5 / segment:// txns       │
+└───────────┬────────────────┘         └────────────────┬──────────────┘
+            │                                            │
+            │ END_TXN_ON_PARTITION / SUBSCRIPTION        │
+            ▼                                            ▼
+┌─────────────────────────────────────────────────────────────────────┐
+│   Per-topic broker components                                        │
+│                                                                      │
+│   ┌──────────────────────────┐  ┌────────────────────────┐          │
+│   │ TopicTransactionBuffer    │  │ MLPendingAckStore      │          │
+│   │ (in-stream markers)       │  │ (sibling topic)        │          │
+│   │  → persistent:// topics   │  │  → persistent:// topics│          │
+│   └──────────────────────────┘  └────────────────────────┘          │
+│                                                                      │
+│   ┌──────────────────────────┐  ┌────────────────────────┐          │
+│   │ MetadataTransactionBuffer │  │ MetadataPendingAckStore│          │
+│   │ (metadata-store records)  │  │ (metadata-store records)          │
+│   │  → segment:// topics      │  │  → segment:// topics   │          │
+│   └────────┬─────────────────┘  └─────────┬──────────────┘          │
+└────────────┼──────────────────────────────┼─────────────────────────┘
+             │                              │
+             ▼                              ▼
+           Metadata Store — txn coordinator state + txn-op records + secondary 
indexes
+```
+
+The `TransactionBufferProvider` and `TransactionPendingAckStoreProvider` SPIs 
already exist. The new TB / PendingAckStore implementations slot in behind 
them. The v5 TC is a parallel coordinator selected by the client when it is 
configured for the new path. Selection on the participant side is per-topic, 
based on the topic's domain.
+
+---
+
+## Detailed Design
+
+### Data Model
+
+The metadata store holds two classes of records and four secondary indexes. 
All records for a given transaction share the same **partition key** (`txnId`) 
so they are co-located — this makes per-txn scans (e.g. listing all ops to 
apply at end-txn time) a single-partition operation rather than a fan-out.
+
+> **A note on metadata-store backends.** The design is 
`MetadataStore`-agnostic. It depends on three capabilities — partition-key 
co-location, sequential keys, and secondary indexes with range queries and 
range-watch — that the `MetadataStore` interface does not expose today. We 
extend the interface to surface them; backends that natively support these 
(notably Oxia, the intended default) implement them directly, while backends 
that don't (e.g. ZooKeeper) can implement them in a less efficient way 
(client-side counters for sequential IDs; client-maintained index records; 
periodic re-list in lieu of range-watch). Correctness does not depend on 
backend choice; throughput and recovery latency may.
+
+#### Header — one per transaction. Linearization point.
+
+```
+/txn/<txnId>                        partitionKey = txnId
+  =  {
+       state:       OPEN | COMMITTED | ABORTED,
+       timeout_ms:  <abs epoch ms>,
+       created_ms:  <abs epoch ms>
+     }
+```
+
+State transitions are conditional puts (CAS on version) issued by the v5 TC. 
`OPEN → COMMITTED` and `OPEN → ABORTED` are the only allowed transitions; 
`COMMITTED` and `ABORTED` are terminal.
+
+#### Operation records — one per transactional write or ack. Unbounded.
+
+```
+/txn-op/<txnId>/<seq>               partitionKey = txnId,
+                                    sequential   = true     # server-assigned 
<seq>
+  =  {
+       kind:         "write" | "ack",
+       segment:      "segment://t/n/x/<descriptor>",  # always present
+       subscription: "<sub-fqn>",                     # ack only
+       position:     <ledgerId>:<entryId>
+     }
+```
+
+Each operation is its own record, so a transaction has no size limit and 
concurrent participants do not contend on a single record. With **sequential 
keys** the server (or, on backends that lack them, a `MetadataStore`-side 
counter) assigns `<seq>`, eliminating client-side collisions.
+
+#### Secondary indexes (auto-maintained by the metadata store)
+
+```
+idx:writes-by-segment              on /txn-op/* where kind=write
+                                   key = segment
+                                   →  range query "writes touching segment S"
+
+idx:acks-by-segment-subscription   on /txn-op/* where kind=ack
+                                   key = (segment, subscription)
+                                   →  range query "acks on (segment S, 
subscription SU)"
+
+idx:txn-by-deadline                on /txn/* where state=OPEN
+                                   key = timeout_ms
+                                   →  range query "open txns past deadline"
+                                   →  used by TC for timeout-driven abort
+
+idx:txn-by-final-state             on /txn/* where state ∈ {COMMITTED, ABORTED}
+                                   key = (state, finalized_ms)
+                                   →  range query "finalized txns ready for GC"
+                                   →  used by GC sweep to find finalized txns 
whose op records can be deleted
+```
+
+#### Garbage collection
+
+A finalized transaction (`COMMITTED` or `ABORTED`) is removed in two phases:
+
+1. **Per-participant materialization.** When the TC fans out end-txn, each 
participant broker materializes the decision (commit: advance subscription 
cursors for acks, evict header cache; abort: drop ops). Once a participant has 
finished its materialization for `<txnId>`, it deletes its op records 
(`/txn-op/<txnId>/<seq>` for ops it owns).

Review Comment:
   is it possible to remove any op records for finalized txns until the data 
has been deleted? as long as the data is retained, do the txn records need to 
exist so that the entries can be consistently consumed? Without txn 
commit/abort information, it wouldn't be possible to determine whether certain 
records should be skipped or not when a new subscription or 
reader/checkpointconsumer reads the records.



##########
pip/pip-471.md:
##########
@@ -0,0 +1,401 @@
+# PIP-471: Metadata-Driven Transactions for Scalable Topics
+
+*Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)*
+
+## Background
+
+### Pulsar's existing transaction model
+
+Pulsar transactions today are realized through three components:
+
+- **Transaction Coordinator (TC)** — a per-broker service backed by a system 
topic (`__transaction_log_*` in `pulsar/system`) that tracks the lifecycle of 
every transaction (`OPEN`, `COMMITTING`, `COMMITTED`, `ABORTING`, `ABORTED`, 
`TIME_OUT`) and orchestrates two-phase commit across the topics that 
participate in each transaction.
+- **TransactionBuffer (TB)** — a per-`PersistentTopic` component that buffers 
transactional writes in the topic's data stream, tracks aborted transaction 
IDs, and gates the dispatcher's read horizon (`maxReadPosition`) so that 
uncommitted entries are not delivered. The TB persists its state in a 
per-namespace system topic (`__transaction_buffer_snapshot`).
+- **PendingAckStore** — a per-(topic, subscription) component that records 
transactional acknowledgments in a sibling persistent topic 
(`<topic>-<sub>__transaction_pending_ack`), applying them to the cursor only 
when the transaction commits.
+
+When a transaction ends, the TC sends `END_TXN_ON_PARTITION` (and 
`END_TXN_ON_SUBSCRIPTION` for acks) to every participant. The TB then writes a 
**commit or abort marker** as a regular entry in the topic's managed ledger. 
The dispatcher discovers committed/aborted state by replaying these markers and 
consulting the in-memory aborted-txn set.
+
+### Scalable topics
+
+[PIP-460](pip-460.md) introduces scalable topics: a logical topic backed by a 
DAG of range segments (`segment://...`) that can be split or merged at runtime. 
Each segment is a regular `PersistentTopic` from the broker's perspective, but 
the segment's lifetime is controlled by the [scalable topic 
controller](pip-468.md) — segments get **sealed** when split or merged, after 
which the segment's managed ledger no longer accepts writes.
+
+### How the two interact
+
+The current transaction implementation composes per-`PersistentTopic`. With 
scalable topics, every segment carries its own TB. This composition fails in 
two ways:
+
+1. **End-of-transaction stalls on sealed segments.** The TC sends 
`END_TXN_ON_PARTITION` to each segment that received writes. The segment's TB 
tries to append a commit/abort marker — which is a write — and the now-sealed 
segment rejects it. The end-txn RPC times out (~30s).
+2. **Pending-ack topic naming collides with the segment-domain parser.** The 
convention `<topic>-<sub>__transaction_pending_ack` is unparseable when 
`<topic>` is a `segment://...` URI. (Worked around in #25631 with a flat 
persistent name; see "Out of Scope" below.)
+
+The first issue is structural, not just a routing bug. As long as commit/abort 
decisions need to be persisted **inside the topic's data stream**, sealing the 
topic terminates any in-flight transaction.
+
+---
+
+## Motivation
+
+We need transactions that:
+
+1. Provide atomicity across multiple writes and acknowledgments, possibly 
spanning multiple topics across multiple namespaces.
+2. Compose correctly with the scalable-topic lifecycle — including splits, 
merges, and segments sealed mid-transaction.
+3. Do not require duplicating data (each `producer.send` produces a single 
managed-ledger append).
+4. Reuse as much of the existing transaction surface as possible — interfaces, 
dispatcher integration, client API — so that we are not re-litigating 
well-understood concerns.
+5. Coexist with v4 transactions on `persistent://` topics with no behavior 
change for those topics.
+
+The structural mismatch between in-stream markers and a mutable segment DAG 
cannot be papered over at the routing or the topic-naming layer. It needs a 
transaction representation that does not put the decision record inside the 
data stream.
+
+---
+
+## Goals
+
+### In Scope
+
+- Atomic transactions over `segment://` topics (writes and acks), including 
transactions whose lifetime spans split/merge.
+- Multi-topic, multi-namespace, multi-segment transactions with the same 
atomicity guarantees as today.
+- Reuse of the existing `Transaction`, `TransactionCoordinator`, 
`TransactionBuffer`, `PendingAckStore`, dispatcher, and client APIs. New 
behavior arrives as alternative implementations behind the existing interfaces.
+- Coexistence with the legacy in-stream-marker implementation for 
`persistent://` topics.
+
+### Out of Scope
+
+- Replacing the legacy implementation for non-scalable topics. The new 
implementation is opt-in per topic; `persistent://` topics keep their current 
behavior, including the existing TC.
+- Replacing the segment-aware pending-ack topic name introduced in #25631 — 
that workaround becomes unnecessary as a side effect of this PIP and is removed 
in the same change.
+- Cross-cluster (geo-replicated) transactional semantics.
+
+---
+
+## High Level Design
+
+The proposal is one sentence:
+
+> **Move transactional state out of the data stream and into the metadata 
store.**
+
+Concretely: keep all existing components and interfaces, and add a parallel 
implementation of `TransactionBuffer`, `PendingAckStore`, **and Transaction 
Coordinator** that writes nothing to any data stream. Their state lives 
entirely in the metadata store. The legacy in-stream-marker components remain, 
unchanged, for `persistent://` topics; the new metadata-driven components 
handle `segment://` topics. The dispatcher's contract is unchanged.
+
+Why introduce a v5 TC rather than reuse the legacy one: the legacy TC stores 
its log in a system topic (`__transaction_log_*`), which carries the 
operational concerns of any system topic — compaction can lead to long recovery 
times, leadership has to be maintained, and recovery is on the data path. With 
the metadata store available we can have a TC whose state is just a few 
key-value records, no log, no system topic, no per-broker in-memory replay. 
Running both TC implementations in parallel keeps v4 transactions byte-for-byte 
unchanged while the v5 path uses the simpler design.
+
+### Why this works for scalable topics
+
+- **Sealing a segment is irrelevant.** Commit/abort no longer require any 
append to the segment. End-txn becomes a metadata-store CAS on a single record. 
Sealed segments materialize the decision (advance cursors, evict cache entries) 
without writing anything.
+- **The dispatcher does not change.** It already asks the topic's TB for 
`maxReadPosition` and `isTxnAborted`. We swap the source.
+- **Splits/merges do not strand transactions.** Sealed parents and live 
children both consult the same metadata; the decision lives above the segments.
+- **No data is duplicated.** Each transactional `send` produces exactly one 
managed-ledger append, same as today.
+
+### Architecture Overview
+
+```
+┌──────────────────────────────────────────────────────────────────┐
+│   Client (V5)  -- producer.send(txn,...)                          │
+│                -- consumer.acknowledge(id, txn)                   │
+└─────────────────────────────────┬─────────────────────────────────┘
+                                  │
+            ┌─────────────────────┴─────────────────────┐
+            │                                            │
+┌───────────▼────────────────┐         ┌────────────────▼──────────────┐
+│   Transaction Coordinator   │         │   Transaction Coordinator V5   │
+│   (legacy, BK-backed log)   │         │   (metadata-store records)     │
+│   → v4 / persistent:// txns │         │   → v5 / segment:// txns       │
+└───────────┬────────────────┘         └────────────────┬──────────────┘
+            │                                            │
+            │ END_TXN_ON_PARTITION / SUBSCRIPTION        │
+            ▼                                            ▼
+┌─────────────────────────────────────────────────────────────────────┐
+│   Per-topic broker components                                        │
+│                                                                      │
+│   ┌──────────────────────────┐  ┌────────────────────────┐          │
+│   │ TopicTransactionBuffer    │  │ MLPendingAckStore      │          │
+│   │ (in-stream markers)       │  │ (sibling topic)        │          │
+│   │  → persistent:// topics   │  │  → persistent:// topics│          │
+│   └──────────────────────────┘  └────────────────────────┘          │
+│                                                                      │
+│   ┌──────────────────────────┐  ┌────────────────────────┐          │
+│   │ MetadataTransactionBuffer │  │ MetadataPendingAckStore│          │
+│   │ (metadata-store records)  │  │ (metadata-store records)          │
+│   │  → segment:// topics      │  │  → segment:// topics   │          │
+│   └────────┬─────────────────┘  └─────────┬──────────────┘          │
+└────────────┼──────────────────────────────┼─────────────────────────┘
+             │                              │
+             ▼                              ▼
+           Metadata Store — txn coordinator state + txn-op records + secondary 
indexes
+```
+
+The `TransactionBufferProvider` and `TransactionPendingAckStoreProvider` SPIs 
already exist. The new TB / PendingAckStore implementations slot in behind 
them. The v5 TC is a parallel coordinator selected by the client when it is 
configured for the new path. Selection on the participant side is per-topic, 
based on the topic's domain.
+
+---
+
+## Detailed Design
+
+### Data Model
+
+The metadata store holds two classes of records and four secondary indexes. 
All records for a given transaction share the same **partition key** (`txnId`) 
so they are co-located — this makes per-txn scans (e.g. listing all ops to 
apply at end-txn time) a single-partition operation rather than a fan-out.
+
+> **A note on metadata-store backends.** The design is 
`MetadataStore`-agnostic. It depends on three capabilities — partition-key 
co-location, sequential keys, and secondary indexes with range queries and 
range-watch — that the `MetadataStore` interface does not expose today. We 
extend the interface to surface them; backends that natively support these 
(notably Oxia, the intended default) implement them directly, while backends 
that don't (e.g. ZooKeeper) can implement them in a less efficient way 
(client-side counters for sequential IDs; client-maintained index records; 
periodic re-list in lieu of range-watch). Correctness does not depend on 
backend choice; throughput and recovery latency may.
+
+#### Header — one per transaction. Linearization point.
+
+```
+/txn/<txnId>                        partitionKey = txnId
+  =  {
+       state:       OPEN | COMMITTED | ABORTED,
+       timeout_ms:  <abs epoch ms>,
+       created_ms:  <abs epoch ms>
+     }
+```
+
+State transitions are conditional puts (CAS on version) issued by the v5 TC. 
`OPEN → COMMITTED` and `OPEN → ABORTED` are the only allowed transitions; 
`COMMITTED` and `ABORTED` are terminal.
+
+#### Operation records — one per transactional write or ack. Unbounded.
+
+```
+/txn-op/<txnId>/<seq>               partitionKey = txnId,
+                                    sequential   = true     # server-assigned 
<seq>
+  =  {
+       kind:         "write" | "ack",
+       segment:      "segment://t/n/x/<descriptor>",  # always present
+       subscription: "<sub-fqn>",                     # ack only
+       position:     <ledgerId>:<entryId>
+     }
+```
+
+Each operation is its own record, so a transaction has no size limit and 
concurrent participants do not contend on a single record. With **sequential 
keys** the server (or, on backends that lack them, a `MetadataStore`-side 
counter) assigns `<seq>`, eliminating client-side collisions.
+
+#### Secondary indexes (auto-maintained by the metadata store)
+
+```
+idx:writes-by-segment              on /txn-op/* where kind=write
+                                   key = segment
+                                   →  range query "writes touching segment S"
+
+idx:acks-by-segment-subscription   on /txn-op/* where kind=ack
+                                   key = (segment, subscription)
+                                   →  range query "acks on (segment S, 
subscription SU)"
+
+idx:txn-by-deadline                on /txn/* where state=OPEN
+                                   key = timeout_ms
+                                   →  range query "open txns past deadline"
+                                   →  used by TC for timeout-driven abort
+
+idx:txn-by-final-state             on /txn/* where state ∈ {COMMITTED, ABORTED}
+                                   key = (state, finalized_ms)
+                                   →  range query "finalized txns ready for GC"
+                                   →  used by GC sweep to find finalized txns 
whose op records can be deleted
+```
+
+#### Garbage collection
+
+A finalized transaction (`COMMITTED` or `ABORTED`) is removed in two phases:
+
+1. **Per-participant materialization.** When the TC fans out end-txn, each 
participant broker materializes the decision (commit: advance subscription 
cursors for acks, evict header cache; abort: drop ops). Once a participant has 
finished its materialization for `<txnId>`, it deletes its op records 
(`/txn-op/<txnId>/<seq>` for ops it owns).
+2. **Header GC sweep.** A periodic sweep scans `idx:txn-by-final-state` for 
entries past a configurable retention window (e.g. 60 s after `finalized_ms`). 
For each, it verifies no `/txn-op/<txnId>/*` records remain (orphan check from 
a participant crash), forces deletion of any leftovers, and finally deletes the 
header `/txn/<txnId>`.
+
+Because all of a txn's records share the same partition (`partitionKey = 
txnId`), the GC sweep's per-txn cleanup stays in one partition: list 
`/txn-op/<txnId>/`, delete, then delete the header.
+
+Indexes update transactionally with the underlying records, so they self-clean.
+
+### Components
+
+#### `MetadataTransactionBuffer` (new)
+
+Implements the existing `TransactionBuffer` interface. Used for `segment://` 
topics.
+
+| Method | Behavior |
+|---|---|
+| `appendBufferToTxn(txnId, buf)` | `ML.asyncAddEntry(buf)`; on success, 
append a sequential `/txn-op/<txnId>/<seq>` (`partitionKey=txnId`) with 
`kind="write", segment, position`. The publish ack waits for both. |
+| `commit(txnId, position)` / `abort(...)` | Not invoked by the v5 TC (which 
does not RPC participants). The TB's header watch fires when 
`/txn/<txnId>.state` changes; the TB then materializes locally (evict / 
mark-aborted) and deletes its owned op records. |
+| `getMaxReadPosition()` | Read from in-memory cache. Cache is populated by a 
watch on `idx:writes-by-segment == <my-segment>` joined against the header 
cache. Result: `min(position over OPEN txns) - 1`, capped at LAC. |
+| `isTxnAborted(msg)` | Look up `/txn/<txnId>.state` from header cache. |
+| `recover()` | Open the index watch and the header cache; populate from the 
current snapshot. No log replay, no snapshot topic. |
+
+#### `MetadataPendingAckStore` (new)
+
+Implements the existing `PendingAckStore` interface. Used for `segment://` 
topic subscriptions.
+
+| Method | Behavior |
+|---|---|
+| `appendIndividualAck(txnId, positions)` | Append sequential 
`/txn-op/<txnId>/<seq>` records with `kind="ack", segment, subscription, 
position`. |
+| `appendCumulativeAck(...)` | Same shape, single op record carrying the 
cumulative position. |
+| `commit(txnId)` / `abort(txnId)` | Not invoked by the v5 TC. Triggered 
locally when the header watch on `/txn/<txnId>.state` fires. Commit: 
range-query `idx:acks-by-segment-subscription == (<my-segment>, 
<my-subscription>)` filtered to `<txnId>`; apply to cursor (`markDelete` or 
`individualAck`); range-delete the op records. Abort: range-delete the op 
records, no cursor work. |
+| `replayAsync()` (recovery) | Range-query `idx:acks-by-segment-subscription 
== (<my-segment>, <my-subscription>)`, group by `txnId`, hydrate in-memory 
state. |
+
+#### Transaction Coordinator V5 (new)
+
+A parallel coordinator selected by the v5 client. Same client-facing wire 
commands (`NEW_TXN`, `ADD_PARTITION_TO_TXN`, `ADD_SUBSCRIPTION_TO_TXN`, 
`END_TXN`), but no system-topic log: every operation reads or CAS's a 
metadata-store record. **The TC does not RPC participants** — see "Notification 
mechanism" below.
+
+| Operation | Behavior |
+|---|---|
+| `newTxn(timeoutMs)` | Create `/txn/<txnId>` with `state=OPEN`, 
`timeout_ms=now+timeoutMs`. |
+| `addPartitionToTxn` / `addSubscriptionToTxn` | No-op at the coordinator. The 
participant broker writes its own op records when the actual write/ack arrives; 
the TC never needs to enumerate participants. |
+| `endTxn(COMMIT\|ABORT)` | A single CAS on `/txn/<txnId>.state`. After it 
returns, the TC sets `finalized_ms` on the header and acks the client. No 
fan-out, no waiting on participants. |
+| Timeout sweep | Range-query `idx:txn-by-deadline` for entries with 
`timeout_ms ≤ now`, abort each (same single-CAS flow). |
+| GC sweep | Range-query `idx:txn-by-final-state` for entries past retention; 
for each, verify `/txn-op/<txnId>/*` is empty (force-delete leftovers); delete 
header. |
+
+Why parallel rather than reusing the legacy TC: the legacy TC's per-shard 
system topic (`__transaction_log_*`) requires leadership election, runs 
compaction over its own log, and pays a recovery cost on every broker restart 
proportional to the live transaction set. The v5 TC's state is just per-txn KV 
records — there is no log to compact and no cold-start replay. Running both in 
parallel keeps v4 transactions byte-for-byte unchanged while the v5 path uses 
the simpler design. A v5 client routes its `NEW_TXN` to the v5 TC; v4 clients 
route to the legacy TC. A single transaction does not span the two.
+
+#### Notification mechanism (TC → participants)
+
+The legacy TC needs to RPC each participant (`END_TXN_ON_PARTITION`, 
`END_TXN_ON_SUBSCRIPTION`) because the participants have no other way to learn 
the decision — the TC's log is the only source of truth, and only the TC reads 
it.
+
+In the v5 design **the metadata store is the source of truth**, and every 
participant already reads from it. Participants therefore learn about state 
transitions directly from the store, without any TC-to-broker RPC:
+
+- A `MetadataTransactionBuffer` keeps an in-memory header cache for txns it 
has writes from. The cache entries are populated when a write op record is 
appended (the broker reads the header to authorize the write) and **kept up to 
date by point-watches on the headers it has cached**.
+- A `MetadataPendingAckStore` maintains the same pattern for txns it has acks 
from.
+- When the TC CAS's `/txn/<txnId>.state` from OPEN → COMMITTED/ABORTED, every 
cached watcher fires. Each participant materializes locally:
+  - **Commit** — TB evicts its cache entry (the txn no longer pins 
`maxReadPosition` back); PendingAckStore applies the buffered acks to the 
cursor.
+  - **Abort** — TB marks the txn aborted in its cache (the dispatcher's 
`isTxnAborted` will skip those entries); PendingAckStore drops the buffered 
acks.
+- After materialization, the participant deletes the op records it owns 
(`/txn-op/<txnId>/<seq>` for ops on its segment / subscription).
+- The TC's GC sweep (above) detects when all participants have done their 
cleanup — the prefix `/txn-op/<txnId>/*` is empty — and deletes the header.
+
+Consequences:
+
+- **End-txn latency.** From the client's perspective, `commit` returns as soon 
as the header CAS lands. From a consumer's perspective, freshly-committed 
entries become visible after the participant's header watch fires + 
materialization runs. That's typically tens of milliseconds; bounded by 
metadata-store watch propagation. (If we ever care about a tighter bound — e.g. 
for a given workload — the TC can issue an optional `nudge` RPC to participants 
in parallel with the CAS. Not needed for correctness; not in this PIP.)
+- **No RPC fan-out from TC.** End-txn is `O(1)` work at the TC: one CAS. The 
cost of fan-out is paid by the metadata store's watch-delivery infrastructure, 
which already exists for other Pulsar uses.
+- **Crash idempotence.** A participant that crashes during materialization 
restarts, observes the (already-final) header state via its watch, and finishes 
materialization. The TC need not retry anything.
+
+#### Dispatcher
+
+Unchanged. It already asks `topic.getTransactionBuffer().getMaxReadPosition()` 
and `topic.getTransactionBuffer().isTxnAborted(...)`. The new TB implements 
both.
+
+### Flows
+
+#### Publish (transactional)
+
+```mermaid
+sequenceDiagram
+    participant C as Client
+    participant B as Segment broker
+    participant ML as Managed Ledger
+    participant M as Metadata Store
+
+    C->>B: send(txnId, payload)
+    B->>M: read /txn/<txnId>.state  (cached)
+    alt state != OPEN
+        B-->>C: TxnConflict
+    else state == OPEN
+        B->>ML: asyncAddEntry(payload)
+        ML-->>B: position
+        B->>M: put /txn-op/<txnId>/<seq> {kind=write, segment, position}
+        M-->>B: ack
+        B-->>C: send-ack
+    end
+```
+
+The header read is cache-first; the cache is invalidated by the same watch the 
TB already maintains on the header. The op-record put is the only synchronous 
metadata-store write on the publish path.
+
+#### End-txn (commit or abort)
+
+```mermaid
+sequenceDiagram
+    participant Cl as Client
+    participant TC as Transaction Coordinator V5
+    participant M as Metadata Store
+    participant P as Participant brokers
+
+    Cl->>TC: commit(txnId)
+    TC->>M: CAS /txn/<txnId>.state OPEN→COMMITTED, set finalized_ms
+    M-->>TC: ack
+    TC-->>Cl: ack
+
+    Note over M,P: Independently, asynchronously:
+    M-->>P: header watch fires
+    P->>P: materialize (cursor advance / cache evict)
+    P->>M: delete owned /txn-op/<txnId>/<seq> records
+
+    Note over TC,M: Later, GC sweep:
+    TC->>M: list /txn-op/<txnId>/*  (empty? then delete header)
+```
+
+The CAS on the header is the linearization point — that is when the 
transaction's outcome is decided. Notification of participants is not part of 
the linearization; it propagates via the watches every participant already 
maintains on the headers it has cached. Sealed segments are fine — 
materialization is metadata + cursor work, no managed-ledger writes.
+
+#### Subscribe / dispatch
+
+Unchanged. The dispatcher polls `tb.getMaxReadPosition()` and filters by 
`tb.isTxnAborted(msg)`. The `MetadataTransactionBuffer` answers both from its 
in-memory caches, fed by metadata-store watches.

Review Comment:
   There's currently a performance issue (either additional latency or CPU 
spinloop) when dispatcher polls getMaxReadPosition. This happens when the 
dispatcher has read all entries up to the maxReadPosition.
   
   For example for the read operation in 
PersistentDispatcherMultipleConsumers.readMoreEntries: 
   
https://github.com/apache/pulsar/blob/138595f6256c301956f9d77fde8534699e992536/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L420-L421
   
   The read will complete here:
   
https://github.com/apache/pulsar/blob/a39d241b0883df04ce6294e48d195b1be37e3308/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2314-L2317
   
   After completion, a new read will be issued. This loop will keep on going 
until the maxReadPosition moves forward.
   
   A better solution would be that the dispatcher could be notified by the 
transaction component when the maxReadPosition has advanced so that there 
wouldn't be a need to poll.
   
   btw. The current dispatcher code should be optimized for v4 transactions 
since even with polling, it could be improved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to