dao-jun commented on code in PR #25873:
URL: https://github.com/apache/pulsar/pull/25873#discussion_r3380274456
##########
pip/pip-480.md:
##########
@@ -0,0 +1,153 @@
+# PIP-480: Add cursorless readEntries API to ManagedLedger
+
+# Background knowledge
+
+Most `ManagedLedger` reads go through `ManagedCursor`.
+A cursor is durable state: read position, mark-delete position, and individual
deleted entries.
+That is the right model for Pulsar subscriptions because the broker owns
subscription progress and uses cursor state for
+acknowledgement, backlog accounting, and ledger retention.
+
+Protocol handlers such as KoP do not always want that state. Kafka offsets are
maintained outside Pulsar's
+`ManagedCursor` abstraction. If KoP opens a cursor only to fetch entries, it
creates extra managed-ledger metadata and
+then has to keep the cursor lifecycle aligned with Kafka offsets.
+
+`ManagedLedger` already exposes `asyncReadEntry(Position, ...)` for reading a
single entry by position, but it does not
+provide a batch read primitive for this use case. Callers have to repeat the
ledger traversal logic themselves: validate
+the start position, cross ledger boundaries, skip empty ledgers, and stop at
the current last confirmed entry.
+
+# Motivation
+
+Provide a cursorless read path for downstream projects like KoP.
+
+A caller should be able to pass a `Position` and a maximum entry count, and
get the entries that are readable now.
+The call must not open a `ManagedCursor`, update cursor metadata, or change
acknowledgement state.
+The downstream protocol remains responsible for its own offsets.
+
+# Goals
+
+## In Scope
+
+* Add a `ManagedLedger` API to read a batch of entries from a given position
without a cursor.
+* Treat an existing start position as inclusive.
+* Allow reads to cross ledger boundaries and skip empty or removed empty
ledgers.
+* Support `PositionFactory.EARLIEST` and `PositionFactory.LATEST`.
+* Return only entries that are already readable when the call is made.
+* Ignore cursor acknowledgement state.
+
+## Out of Scope
+
+* Replace `ManagedCursor` for Pulsar subscriptions.
+* Use cursorless reads to pin backlog or change ledger retention semantics.
+* Apply dispatcher-level filtering, such as subscription acknowledgement
state, delayed delivery, or transaction
+ visibility.
+
+# High Level Design
+
+Add new asynchronous methods to `ManagedLedger`:
+
+```java
+CompletableFuture<List<Entry>> asyncReadEntries(Position startPosition, int
numberOfEntries, Position maxPosition);
+
+default CompletableFuture<List<Entry>> asyncReadEntries(Position
startPosition, int numberOfEntries) {
+ return asyncReadEntries(startPosition, numberOfEntries,
PositionFactory.LATEST);
+}
+```
+
+Both methods return up to `numberOfEntries` raw managed-ledger entries
starting from `startPosition`.
+`maxPosition` is inclusive — the entry at `maxPosition` itself is eligible for
return. The read stops when the next
+candidate read position would be strictly after `maxPosition`. The
two-parameter overload uses `PositionFactory.LATEST`
+as `maxPosition`, meaning it reads up to the current end of the managed
ledger. Neither method creates, updates, or
+consults a `ManagedCursor`.
+
+Before reading, the implementation validates arguments and normalizes the
start position:
+
+* If `maxPosition` is before the normalized start position, the call returns
an empty list.
+* `PositionFactory.EARLIEST` starts from the first available entry.
+* `PositionFactory.LATEST` starts after the current last confirmed entry.
+* An existing entry position is used directly.
+* A missing position before the end of the readable range is moved to the next
valid entry.
+* A position after the current last confirmed entry returns an empty list.
+
+The current ledger is read through the active write handle. Closed ledgers are
read through the existing read-handle
+cache. The operation reads one ledger range at a time and continues until it
reaches the requested count, reaches or
+exceeds `maxPosition`, or hits the current readable end of the managed ledger.
+
+# Detailed Design
+
+## Public-facing Changes
+
+### Public API
+
+Add the following method in `ManagedLedger`:
+
+```java
+/**
+ * Read entries from the managed ledger starting from the provided position.
+ *
+ * <p>The start position is inclusive when it points to an existing entry. If
it points to a non-existing entry,
+ * the read starts from the next valid entry in ledger order. {@link
PositionFactory#EARLIEST} starts from the first
+ * available entry, while {@link PositionFactory#LATEST} starts from the
position after the current last confirmed
+ * entry. This method does not wait for future writes and will complete with
fewer entries, or an empty list, when
+ * there are not enough currently readable entries.
+ *
+ * <p>The returned entries are raw ledger entries and are not filtered by any
cursor acknowledgement state. Callers
+ * are responsible for releasing returned entries.
+ */
+CompletableFuture<List<Entry>> asyncReadEntries(Position startPosition, int
numberOfEntries, Position maxPosition);
+
+
+default CompletableFuture<List<Entry>> asyncReadEntries(Position
startPosition, int numberOfEntries) {
+ return asyncReadEntries(startPosition, numberOfEntries,
PositionFactory.LATEST);
+}
+```
+
+Invalid arguments fail the returned future with `IllegalArgumentException`:
+
+* `startPosition == null`
+* `numberOfEntries <= 0`
+* `maxPosition == null`
+
+If the managed ledger is fenced or closed, the returned future fails with
`ManagedLedgerFencedException`.
+
+### Read semantics
+
+The implementation adds an internal `OpReadEntries` operation with the
following behavior:
+
+* It uses `startReadOperationOnLedger` to move a read from a deleted empty
ledger to the next available ledger.
+* It uses the managed-ledger `lastConfirmedEntry` as the upper visibility
bound for the current ledger.
+* It uses the BookKeeper `lastAddConfirmed` value for closed ledgers.
+* The per-entry position check treats `maxPosition` as inclusive: a read
position equal to `maxPosition` is
+ allowed; a read position strictly after `maxPosition` stops the operation.
This constraint overrides any
+ ledger-level upper bound — an explicit `maxPosition` before
`lastConfirmedEntry` ends the read early.
+* It reads at most the remaining requested count from each ledger.
+* It completes immediately when it reaches the current ledger end. It does not
wait for future writes.
+* If a later ledger read fails after some entries have already been collected,
the operation completes with the partial
Review Comment:
1. return partial
2. stop operation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]