https://github.com/apache/pulsar/pull/25693
------
# PIP-471: Metadata-Driven Transactions for Scalable Topics
*Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)*
## Background
### Pulsar's existing transaction model
Pulsar transactions today are realized through three components:
- **Transaction Coordinator (TC)** — a per-broker service backed by a
system topic (`__transaction_log_*` in `pulsar/system`) that tracks
the lifecycle of every transaction (`OPEN`, `COMMITTING`, `COMMITTED`,
`ABORTING`, `ABORTED`, `TIME_OUT`) and orchestrates two-phase commit
across the topics that participate in each transaction.
- **TransactionBuffer (TB)** — a per-`PersistentTopic` component that
buffers transactional writes in the topic's data stream, tracks
aborted transaction IDs, and gates the dispatcher's read horizon
(`maxReadPosition`) so that uncommitted entries are not delivered. The
TB persists its state in a per-namespace system topic
(`__transaction_buffer_snapshot`).
- **PendingAckStore** — a per-(topic, subscription) component that
records transactional acknowledgments in a sibling persistent topic
(`<topic>-<sub>__transaction_pending_ack`), applying them to the
cursor only when the transaction commits.
When a transaction ends, the TC sends `END_TXN_ON_PARTITION` (and
`END_TXN_ON_SUBSCRIPTION` for acks) to every participant. The TB then
writes a **commit or abort marker** as a regular entry in the topic's
managed ledger. The dispatcher discovers committed/aborted state by
replaying these markers and consulting the in-memory aborted-txn set.
### Scalable topics
[PIP-460](pip-460.md) introduces scalable topics: a logical topic
backed by a DAG of range segments (`segment://...`) that can be split
or merged at runtime. Each segment is a regular `PersistentTopic` from
the broker's perspective, but the segment's lifetime is controlled by
the [scalable topic controller](pip-468.md) — segments get **sealed**
when split or merged, after which the segment's managed ledger no
longer accepts writes.
### How the two interact
The current transaction implementation composes per-`PersistentTopic`.
With scalable topics, every segment carries its own TB. This
composition fails in two ways:
1. **End-of-transaction stalls on sealed segments.** The TC sends
`END_TXN_ON_PARTITION` to each segment that received writes. The
segment's TB tries to append a commit/abort marker — which is a write
— and the now-sealed segment rejects it. The end-txn RPC times out
(~30s).
2. **Pending-ack topic naming collides with the segment-domain
parser.** The convention `<topic>-<sub>__transaction_pending_ack` is
unparseable when `<topic>` is a `segment://...` URI. (Worked around in
#25631 with a flat persistent name; see "Out of Scope" below.)
The first issue is structural, not just a routing bug. As long as
commit/abort decisions need to be persisted **inside the topic's data
stream**, sealing the topic terminates any in-flight transaction.
---
## Motivation
We need transactions that:
1. Provide atomicity across multiple writes and acknowledgments,
possibly spanning multiple topics across multiple namespaces.
2. Compose correctly with the scalable-topic lifecycle — including
splits, merges, and segments sealed mid-transaction.
3. Do not require duplicating data (each `producer.send` produces a
single managed-ledger append).
4. Reuse as much of the existing transaction surface as possible —
interfaces, dispatcher integration, client API — so that we are not
re-litigating well-understood concerns.
5. Coexist with v4 transactions on `persistent://` topics with no
behavior change for those topics.
The structural mismatch between in-stream markers and a mutable
segment DAG cannot be papered over at the routing or the topic-naming
layer. It needs a transaction representation that does not put the
decision record inside the data stream.
---
## Goals
### In Scope
- Atomic transactions over `segment://` topics (writes and acks),
including transactions whose lifetime spans split/merge.
- Multi-topic, multi-namespace, multi-segment transactions with the
same atomicity guarantees as today.
- Reuse of the existing `Transaction`, `TransactionCoordinator`,
`TransactionBuffer`, `PendingAckStore`, dispatcher, and client APIs.
New behavior arrives as alternative implementations behind the
existing interfaces.
- Coexistence with the legacy in-stream-marker implementation for
`persistent://` topics.
### Out of Scope
- Replacing the legacy implementation for non-scalable topics. The new
implementation is opt-in per topic; `persistent://` topics keep their
current behavior, including the existing TC.
- Replacing the segment-aware pending-ack topic name introduced in
#25631 — that workaround becomes unnecessary as a side effect of this
PIP and is removed in the same change.
- Cross-cluster (geo-replicated) transactional semantics.
---
## High Level Design
The proposal is one sentence:
> **Move transactional state out of the data stream and into the metadata
> store.**
Concretely: keep all existing components and interfaces, and add a
parallel implementation of `TransactionBuffer`, `PendingAckStore`,
**and Transaction Coordinator** that writes nothing to any data
stream. Their state lives entirely in the metadata store. The legacy
in-stream-marker components remain, unchanged, for `persistent://`
topics; the new metadata-driven components handle `segment://` topics.
The dispatcher's contract is unchanged.
Why introduce a v5 TC rather than reuse the legacy one: the legacy TC
stores its log in a system topic (`__transaction_log_*`), which
carries the operational concerns of any system topic — compaction can
lead to long recovery times, leadership has to be maintained, and
recovery is on the data path. With the metadata store available we can
have a TC whose state is just a few key-value records, no log, no
system topic, no per-broker in-memory replay. Running both TC
implementations in parallel keeps v4 transactions byte-for-byte
unchanged while the v5 path uses the simpler design.
### Why this works for scalable topics
- **Sealing a segment is irrelevant.** Commit/abort no longer require
any append to the segment. End-txn becomes a metadata-store CAS on a
single record. Sealed segments materialize the decision (advance
cursors, evict cache entries) without writing anything.
- **The dispatcher does not change.** It already asks the topic's TB
for `maxReadPosition` and `isTxnAborted`. We swap the source.
- **Splits/merges do not strand transactions.** Sealed parents and
live children both consult the same metadata; the decision lives above
the segments.
- **No data is duplicated.** Each transactional `send` produces
exactly one managed-ledger append, same as today.
### Architecture Overview
```
┌──────────────────────────────────────────────────────────────────┐
│ Client (V5) -- producer.send(txn,...) │
│ -- consumer.acknowledge(id, txn) │
└─────────────────────────────────┬─────────────────────────────────┘
│
┌─────────────────────┴─────────────────────┐
│ │
┌───────────▼────────────────┐ ┌────────────────▼──────────────┐
│ Transaction Coordinator │ │ Transaction Coordinator V5 │
│ (legacy, BK-backed log) │ │ (metadata-store records) │
│ → v4 / persistent:// txns │ │ → v5 / segment:// txns │
└───────────┬────────────────┘ └────────────────┬──────────────┘
│ │
│ END_TXN_ON_PARTITION / SUBSCRIPTION │
▼ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ Per-topic broker components │
│ │
│ ┌──────────────────────────┐ ┌────────────────────────┐ │
│ │ TopicTransactionBuffer │ │ MLPendingAckStore │ │
│ │ (in-stream markers) │ │ (sibling topic) │ │
│ │ → persistent:// topics │ │ → persistent:// topics│ │
│ └──────────────────────────┘ └────────────────────────┘ │
│ │
│ ┌──────────────────────────┐ ┌────────────────────────┐ │
│ │ MetadataTransactionBuffer │ │ MetadataPendingAckStore│ │
│ │ (metadata-store records) │ │ (metadata-store records) │
│ │ → segment:// topics │ │ → segment:// topics │ │
│ └────────┬─────────────────┘ └─────────┬──────────────┘ │
└────────────┼──────────────────────────────┼─────────────────────────┘
│ │
▼ ▼
Metadata Store — txn coordinator state + txn-op records +
secondary indexes
```
The `TransactionBufferProvider` and
`TransactionPendingAckStoreProvider` SPIs already exist. The new TB /
PendingAckStore implementations slot in behind them. The v5 TC is a
parallel coordinator selected by the client when it is configured for
the new path. Selection on the participant side is per-topic, based on
the topic's domain.
---
## Detailed Design
### Data Model
The metadata store holds two classes of records and four secondary
indexes. All records for a given transaction share the same
**partition key** (`txnId`) so they are co-located — this makes
per-txn scans (e.g. listing all ops to apply at end-txn time) a
single-partition operation rather than a fan-out.
> **A note on metadata-store backends.** The design is
> `MetadataStore`-agnostic. It depends on three capabilities — partition-key
> co-location, sequential keys, and secondary indexes with range queries and
> range-watch — that the `MetadataStore` interface does not expose today. We
> extend the interface to surface them; backends that natively support these
> (notably Oxia, the intended default) implement them directly, while backends
> that don't (e.g. ZooKeeper) can implement them in a less efficient way
> (client-side counters for sequential IDs; client-maintained index records;
> periodic re-list in lieu of range-watch). Correctness does not depend on
> backend choice; throughput and recovery latency may.
#### Header — one per transaction. Linearization point.
```
/txn/<txnId> partitionKey = txnId
= {
state: OPEN | COMMITTED | ABORTED,
timeout_ms: <abs epoch ms>,
created_ms: <abs epoch ms>
}
```
State transitions are conditional puts (CAS on version) issued by the
v5 TC. `OPEN → COMMITTED` and `OPEN → ABORTED` are the only allowed
transitions; `COMMITTED` and `ABORTED` are terminal.
#### Operation records — one per transactional write or ack. Unbounded.
```
/txn-op/<txnId>/<seq> partitionKey = txnId,
sequential = true #
server-assigned <seq>
= {
kind: "write" | "ack",
segment: "segment://t/n/x/<descriptor>", # always present
subscription: "<sub-fqn>", # ack only
position: <ledgerId>:<entryId>
}
```
Each operation is its own record, so a transaction has no size limit
and concurrent participants do not contend on a single record. With
**sequential keys** the server (or, on backends that lack them, a
`MetadataStore`-side counter) assigns `<seq>`, eliminating client-side
collisions.
#### Secondary indexes (auto-maintained by the metadata store)
```
idx:writes-by-segment on /txn-op/* where kind=write
key = segment
→ range query "writes touching segment S"
idx:acks-by-segment-subscription on /txn-op/* where kind=ack
key = (segment, subscription)
→ range query "acks on (segment S,
subscription SU)"
idx:txn-by-deadline on /txn/* where state=OPEN
key = timeout_ms
→ range query "open txns past deadline"
→ used by TC for timeout-driven abort
idx:txn-by-final-state on /txn/* where state ∈ {COMMITTED, ABORTED}
key = (state, finalized_ms)
→ range query "finalized txns ready for GC"
→ used by GC sweep to find
finalized txns whose op records can be deleted
```
#### Garbage collection
A finalized transaction (`COMMITTED` or `ABORTED`) is removed in two phases:
1. **Per-participant materialization.** When the TC fans out end-txn,
each participant broker materializes the decision (commit: advance
subscription cursors for acks, evict header cache; abort: drop ops).
Once a participant has finished its materialization for `<txnId>`, it
deletes its op records (`/txn-op/<txnId>/<seq>` for ops it owns).
2. **Header GC sweep.** A periodic sweep scans
`idx:txn-by-final-state` for entries past a configurable retention
window (e.g. 60 s after `finalized_ms`). For each, it verifies no
`/txn-op/<txnId>/*` records remain (orphan check from a participant
crash), forces deletion of any leftovers, and finally deletes the
header `/txn/<txnId>`.
Because all of a txn's records share the same partition (`partitionKey
= txnId`), the GC sweep's per-txn cleanup stays in one partition: list
`/txn-op/<txnId>/`, delete, then delete the header.
Indexes update transactionally with the underlying records, so they self-clean.
### Components
#### `MetadataTransactionBuffer` (new)
Implements the existing `TransactionBuffer` interface. Used for
`segment://` topics.
| Method | Behavior |
|---|---|
| `appendBufferToTxn(txnId, buf)` | `ML.asyncAddEntry(buf)`; on
success, append a sequential `/txn-op/<txnId>/<seq>`
(`partitionKey=txnId`) with `kind="write", segment, position`. The
publish ack waits for both. |
| `commit(txnId, position)` / `abort(...)` | Not invoked by the v5 TC
(which does not RPC participants). The TB's header watch fires when
`/txn/<txnId>.state` changes; the TB then materializes locally (evict
/ mark-aborted) and deletes its owned op records. |
| `getMaxReadPosition()` | Read from in-memory cache. Cache is
populated by a watch on `idx:writes-by-segment == <my-segment>` joined
against the header cache. Result: `min(position over OPEN txns) - 1`,
capped at LAC. |
| `isTxnAborted(msg)` | Look up `/txn/<txnId>.state` from header cache. |
| `recover()` | Open the index watch and the header cache; populate
from the current snapshot. No log replay, no snapshot topic. |
#### `MetadataPendingAckStore` (new)
Implements the existing `PendingAckStore` interface. Used for
`segment://` topic subscriptions.
| Method | Behavior |
|---|---|
| `appendIndividualAck(txnId, positions)` | Append sequential
`/txn-op/<txnId>/<seq>` records with `kind="ack", segment,
subscription, position`. |
| `appendCumulativeAck(...)` | Same shape, single op record carrying
the cumulative position. |
| `commit(txnId)` / `abort(txnId)` | Not invoked by the v5 TC.
Triggered locally when the header watch on `/txn/<txnId>.state` fires.
Commit: range-query `idx:acks-by-segment-subscription ==
(<my-segment>, <my-subscription>)` filtered to `<txnId>`; apply to
cursor (`markDelete` or `individualAck`); range-delete the op records.
Abort: range-delete the op records, no cursor work. |
| `replayAsync()` (recovery) | Range-query
`idx:acks-by-segment-subscription == (<my-segment>,
<my-subscription>)`, group by `txnId`, hydrate in-memory state. |
#### Transaction Coordinator V5 (new)
A parallel coordinator selected by the v5 client. Same client-facing
wire commands (`NEW_TXN`, `ADD_PARTITION_TO_TXN`,
`ADD_SUBSCRIPTION_TO_TXN`, `END_TXN`), but no system-topic log: every
operation reads or CAS's a metadata-store record. **The TC does not
RPC participants** — see "Notification mechanism" below.
| Operation | Behavior |
|---|---|
| `newTxn(timeoutMs)` | Create `/txn/<txnId>` with `state=OPEN`,
`timeout_ms=now+timeoutMs`. |
| `addPartitionToTxn` / `addSubscriptionToTxn` | No-op at the
coordinator. The participant broker writes its own op records when the
actual write/ack arrives; the TC never needs to enumerate
participants. |
| `endTxn(COMMIT\|ABORT)` | A single CAS on `/txn/<txnId>.state`.
After it returns, the TC sets `finalized_ms` on the header and acks
the client. No fan-out, no waiting on participants. |
| Timeout sweep | Range-query `idx:txn-by-deadline` for entries with
`timeout_ms ≤ now`, abort each (same single-CAS flow). |
| GC sweep | Range-query `idx:txn-by-final-state` for entries past
retention; for each, verify `/txn-op/<txnId>/*` is empty (force-delete
leftovers); delete header. |
Why parallel rather than reusing the legacy TC: the legacy TC's
per-shard system topic (`__transaction_log_*`) requires leadership
election, runs compaction over its own log, and pays a recovery cost
on every broker restart proportional to the live transaction set. The
v5 TC's state is just per-txn KV records — there is no log to compact
and no cold-start replay. Running both in parallel keeps v4
transactions byte-for-byte unchanged while the v5 path uses the
simpler design. A v5 client routes its `NEW_TXN` to the v5 TC; v4
clients route to the legacy TC. A single transaction does not span the
two.
#### Notification mechanism (TC → participants)
The legacy TC needs to RPC each participant (`END_TXN_ON_PARTITION`,
`END_TXN_ON_SUBSCRIPTION`) because the participants have no other way
to learn the decision — the TC's log is the only source of truth, and
only the TC reads it.
In the v5 design **the metadata store is the source of truth**, and
every participant already reads from it. Participants therefore learn
about state transitions directly from the store, without any
TC-to-broker RPC:
- A `MetadataTransactionBuffer` keeps an in-memory header cache for
txns it has writes from. The cache entries are populated when a write
op record is appended (the broker reads the header to authorize the
write) and **kept up to date by point-watches on the headers it has
cached**.
- A `MetadataPendingAckStore` maintains the same pattern for txns it
has acks from.
- When the TC CAS's `/txn/<txnId>.state` from OPEN →
COMMITTED/ABORTED, every cached watcher fires. Each participant
materializes locally:
- **Commit** — TB evicts its cache entry (the txn no longer pins
`maxReadPosition` back); PendingAckStore applies the buffered acks to
the cursor.
- **Abort** — TB marks the txn aborted in its cache (the
dispatcher's `isTxnAborted` will skip those entries); PendingAckStore
drops the buffered acks.
- After materialization, the participant deletes the op records it
owns (`/txn-op/<txnId>/<seq>` for ops on its segment / subscription).
- The TC's GC sweep (above) detects when all participants have done
their cleanup — the prefix `/txn-op/<txnId>/*` is empty — and deletes
the header.
Consequences:
- **End-txn latency.** From the client's perspective, `commit` returns
as soon as the header CAS lands. From a consumer's perspective,
freshly-committed entries become visible after the participant's
header watch fires + materialization runs. That's typically tens of
milliseconds; bounded by metadata-store watch propagation. (If we ever
care about a tighter bound — e.g. for a given workload — the TC can
issue an optional `nudge` RPC to participants in parallel with the
CAS. Not needed for correctness; not in this PIP.)
- **No RPC fan-out from TC.** End-txn is `O(1)` work at the TC: one
CAS. The cost of fan-out is paid by the metadata store's
watch-delivery infrastructure, which already exists for other Pulsar
uses.
- **Crash idempotence.** A participant that crashes during
materialization restarts, observes the (already-final) header state
via its watch, and finishes materialization. The TC need not retry
anything.
#### Dispatcher
Unchanged. It already asks
`topic.getTransactionBuffer().getMaxReadPosition()` and
`topic.getTransactionBuffer().isTxnAborted(...)`. The new TB
implements both.
### Flows
#### Publish (transactional)
```mermaid
sequenceDiagram
participant C as Client
participant B as Segment broker
participant ML as Managed Ledger
participant M as Metadata Store
C->>B: send(txnId, payload)
B->>M: read /txn/<txnId>.state (cached)
alt state != OPEN
B-->>C: TxnConflict
else state == OPEN
B->>ML: asyncAddEntry(payload)
ML-->>B: position
B->>M: put /txn-op/<txnId>/<seq> {kind=write, segment, position}
M-->>B: ack
B-->>C: send-ack
end
```
The header read is cache-first; the cache is invalidated by the same
watch the TB already maintains on the header. The op-record put is the
only synchronous metadata-store write on the publish path.
#### End-txn (commit or abort)
```mermaid
sequenceDiagram
participant Cl as Client
participant TC as Transaction Coordinator V5
participant M as Metadata Store
participant P as Participant brokers
Cl->>TC: commit(txnId)
TC->>M: CAS /txn/<txnId>.state OPEN→COMMITTED, set finalized_ms
M-->>TC: ack
TC-->>Cl: ack
Note over M,P: Independently, asynchronously:
M-->>P: header watch fires
P->>P: materialize (cursor advance / cache evict)
P->>M: delete owned /txn-op/<txnId>/<seq> records
Note over TC,M: Later, GC sweep:
TC->>M: list /txn-op/<txnId>/* (empty? then delete header)
```
The CAS on the header is the linearization point — that is when the
transaction's outcome is decided. Notification of participants is not
part of the linearization; it propagates via the watches every
participant already maintains on the headers it has cached. Sealed
segments are fine — materialization is metadata + cursor work, no
managed-ledger writes.
#### Subscribe / dispatch
Unchanged. The dispatcher polls `tb.getMaxReadPosition()` and filters
by `tb.isTxnAborted(msg)`. The `MetadataTransactionBuffer` answers
both from its in-memory caches, fed by metadata-store watches.
#### Late-write race
The TC is mid-end-txn when the client publishes once more inside the
same transaction. The header CAS may have already flipped to
`COMMITTED`/`ABORTED`. The publish-path header check on the
participant broker rejects with `TxnConflict`. This mirrors today's TC
behavior (the TC marks transactions as ENDING and brokers reject new
writes); the only difference is that the rejection criterion is now
read from the metadata store rather than from a TC RPC.
### Recovery
- **Broker startup.** Each `MetadataTransactionBuffer` opens its index
watch and header cache. The first watch event delivers the snapshot;
the TB is ready as soon as the snapshot has been applied. No log
replay, no system-topic reader, no snapshot topic.
- **Broker crash mid-publish.** If the broker appended the entry but
crashed before writing the op record, the entry exists in the segment
but no metadata claims it. On txn timeout the TC aborts the txn; the
dispatcher's `isTxnAborted` check (which falls back to "abort" for
unknown txnIds at retention horizon) discards the entry.
- **Broker crash mid-end-txn.** If the header CAS landed but
materialization on a participant did not complete, the participant
re-derives state from the header on restart and finishes
materialization. End-txn is idempotent.
- **TC failover.** The v5 TC has no in-memory log to replay — its
state lives in the metadata store. Whichever broker takes over
coordinator duty for a TC partition resumes operations directly from
the metadata-store records. Cold-start cost is bounded by
`idx:txn-by-deadline` and `idx:txn-by-final-state` scans, not by
replay of an entire transaction log.
### Concurrency and contention
- Each transactional publish writes a unique `/txn-op/<txnId>/<seq>`
record (server-assigned sequential key). There is no contention
between concurrent participants of the same transaction.
- The header is CAS'd at most twice per transaction lifetime (open +
finalize), so contention there is bounded.
- All records for a given txn share `partitionKey=txnId`, so per-txn
operations (list, range-delete) stay on a single partition.
- Index updates are managed by the metadata store; their scaling is
the store's concern.
---
## Public-facing Changes
### Public API
No changes. The client-facing `Transaction` API is unchanged.
### Binary protocol
No changes to client-facing wire commands (`NEW_TXN`,
`ADD_PARTITION_TO_TXN`, `ADD_SUBSCRIPTION_TO_TXN`, `END_TXN`) — the v5
TC accepts them with the same semantics as the legacy TC.
The broker-to-broker commands `END_TXN_ON_PARTITION` and
`END_TXN_ON_SUBSCRIPTION` are **not used** by the v5 path: participant
brokers learn about the decision by watching the metadata-store header
rather than by receiving an RPC from the TC. The legacy TC still uses
these commands for v4 transactions; they remain in the protocol
unchanged.
### Configuration
A per-namespace or per-broker setting selects the TB implementation.
Default for `segment://` topics: metadata-driven. Default for
`persistent://` topics: in-stream markers (unchanged). Override is
possible per-namespace for debugging / migration.
### Metrics
Existing transaction metrics remain. The metadata-driven implementation adds:
- `pulsar_txn_metadata_store_op_writes_total` (counter) — op records written.
- `pulsar_txn_metadata_store_header_cas_total{result="ok|conflict|reject"}`
(counter) — header CAS attempts and outcomes.
- `pulsar_txn_metadata_store_index_query_seconds` (histogram) —
latency of the index range queries on `idx:writes-by-segment` /
`idx:acks-by-segment-subscription`.
- `pulsar_txn_metadata_store_outstanding_op_records` (gauge) —
uncollected op records (a proxy for txn GC backlog).
Existing `pulsar_txn_tb_*` snapshot/replay metrics are not emitted by
the new implementation (no snapshots, no replay).
---
## Backward & Forward Compatibility
### Upgrade
- Existing `persistent://` topic behavior is unchanged. v4 clients see
no difference.
- Brokers running this PIP can interoperate with brokers that do not,
as long as a given **topic** is consistently served by brokers of one
kind. Since topic ownership is bundle-based and migration via
load-balancer transfers TB state across, this is satisfied
automatically.
- Per-segment pending-ack topics created by the workaround in #25631
(`persistent://t/n/<localName>-<descriptor>-<sub>__transaction_pending_ack`)
are no longer used. They are deleted as part of upgrade. Since the
workaround was only ever exercised by V5 transactional consumer flows,
the upgrade path is safe.
### Downgrade / Rollback
Not applicable. Scalable topics are introduced as a new feature in
Pulsar 5.0 ([PIP-460](pip-460.md)); this PIP defines transactional
support for that feature from the start. There is no prior version to
roll back to.
### Pulsar Geo-Replication
Out of scope. Transactional geo-replication is not supported in either model.
---
## Alternatives Considered
### A. Move TB to the scalable-topic level (one TB per logical topic)
Earlier draft of this design. Architecturally clean — decisions live
above segments — but introduces a new broker-side singleton per
scalable topic, adds new failover semantics, and complicates the TC's
wire protocol (end-txn would need redirection from segment to
scalable-topic owner). Replacing the per-topic TB **implementation**
with a metadata-driven one achieves the same correctness without any
of that surface area.
### B. Per-segment TB but using an off-segment marker stream
Keep the per-segment TB; have it write commit/abort markers to a
**separate** managed ledger (e.g. a shadow topic) rather than into the
segment's own data. Sealed segments would no longer block end-txn.
Rejected because: (1) it doubles the data path (every txn needs a
write to the segment **and** to the shadow topic), (2) it requires a
new system-topic-per-segment, and (3) it does not eliminate the
snapshot/replay machinery that the metadata-driven approach removes
outright.
### C. Skip transactional support on scalable topics
Document scalable topics as non-transactional. Rejected: the
transactional consume-and-produce pattern is a primary use case for
scalable streaming workloads (Kafka Streams analogue), and PIP-460's
roadmap explicitly calls out transactions across range segments as a
Phase 4 deliverable.
---
## General Notes
The shape of the change is *one new `TransactionBuffer`
implementation, one new `PendingAckStore` implementation, one new
`TransactionCoordinator` implementation, and the `MetadataStore`
extensions to support partition-key co-location, sequential keys, and
secondary indexes with range-watch*. The complexity is in the
interaction of the metadata schema with the dispatcher's existing
assumptions, not in any new system component on the broker.
## Links
- [PIP-460: Scalable Topics](pip-460.md)
- [PIP-468: Scalable Topic Controller](pip-468.md)
--
Matteo Merli
<[email protected]>