Hi Giannis,
Thanks for the thoughtful reply. A few clarifications.

On Option B and log_offset: Option B keeps the same pinned snapshot across
all batches, so the log_offset guarantee is identical to Option A.

On FetchLog: FetchLog is stateless on the tablet server. The client holds
the offset and sends it with each request. There is no server-side session
between RPCs. The ScannerManager would be the first per-client session
state on a tablet server.

On the iterator being "slightly fewer bytes per session": I ran a test
against Fluss's default RocksDB config (64MB write buffer) with separate DB
instances and identical workloads. A held iterator pinned 23 MB of memtable
memory *rocksdb.size-all-mem-tables) compared to 2 KB with snapshot only,
because the iterator pins the physical memtable even after flush. This is
consistent with what the RocksDB wiki documents, and why it recommends
keeping iterators short-lived. The seek overhead for Option B was 29 µs per
batch, under 1% of the time spent reading a 4MB batch.

Not suggesting this blocks the vote, so let's proceed with these
performance concerns being flagged.

KR,
Anton

пт, 27 мар. 2026 г. в 08:33, Lorenzo Affetti <[email protected]
>:

> @jark thanks for clarifying.
>
> 1) Yeah, I see your point.
> The first "stateless" design I proposed misses the consistency aspect.
> The second "design" for saving snapshots is quite similar to this one,
> which is paginated.
> Thanks for catching the similarity and clarifying.
>
> 2) Thanks for highlighting the precise design choice to favor pull vs push.
> That definitely makes sense.
>
> Sorted out, thank you!
>
> On Fri, Mar 27, 2026 at 3:49 AM Anton Borisov <[email protected]>
> wrote:
>
>> Hi Giannis,
>> Thanks for working on this, my concern is the continuation model.
>>
>> The current design makes the server own both consistency and progress by
>> keeping a live iterator session across RPCs. That is efficient, but it is
>> also what makes retries awkward and assumes single-threaded iterator
>> access, without enforcing it on the server side. More generally, once we
>> keep server-side scan state across calls, we take on the usual session
>> lifecycle machinery around expiry, fencing, and cleanup.
>>
>> There’s a middle ground worth evaluating explicitly: keep snapshot
>> isolation if we need it, but return an opaque resume token, similar to
>> Cassandra paging state, and have the client send it back unchanged on the
>> next request. That keeps the consistency guarantee, but moves scan progress
>> into the protocol instead of a live iterator session on the server. The
>> tradeoff is an extra seek per batch. So in return, retries become much
>> easier to reason about and the server has less mutable state to manage
>> between calls.
>>
>> This feels close to what Lorenzo was asking to see evaluated. Also
>> continuing Jark's reasoning about  `call_seq_id` being simpler than
>> key-based cursors -  I think the comparison shifts when the token is opaque
>> server-produced bytes rather than a client-constructed key, so the client
>> never serializes anything, it just echoes the token back.
>>
>> Also I feel it would be good if the FIP compared:
>>
>>    - live iterator session(HBase approach described in original FIP)
>>    - snapshot-only with continuation token(Cassandra style)
>>    - opaque stateless cursor(no snapshot isolation at all)
>>
>> and say why the extra machinery is justified for the use cases we’re
>> targeting.
>>
>> KR,
>> Anton
>>
>> чт, 26 мар. 2026 г. в 17:11, Giannis Polyzos <[email protected]>:
>>
>>> Hi Jark,
>>>
>>> Thank you for your input and for providing more context... I updated the
>>> FIP based on your suggestions.
>>>
>>> I was debating with the API design, and my new design didn't take into
>>> account the old design with the BatchScanner. I will follow the old
>>> design
>>> since it's more consistent to what we have now.
>>>
>>> Best,
>>> Giannis
>>>
>>> On Thu, Mar 26, 2026 at 5:50 PM Jark Wu <[email protected]> wrote:
>>>
>>> > Hi Lorenzo,
>>> >
>>> > I can help clarify some of the design decisions, as I was involved in
>>> > the initial design of this FIP.
>>> >
>>> > (1) paginated requests with snapshot ID
>>> >
>>> > The current design of this FIP is exactly a paginated request with a
>>> > snapshot ID. When a new scan request is initiated, the server takes a
>>> > snapshot, and all subsequent streaming scan requests retrieve
>>> > paginated data from that specific snapshot.
>>> >
>>> > The only distinction lies in the pagination mechanism. Your proposal
>>> > uses keys, whereas the FIP utilizes `call_seq_id` (effectively a page
>>> > ID). Adopting `call_seq_id` is a simpler and more efficient
>>> > implementation. Using keys would be heavier, as they may require
>>> > serializing multiple fields, significantly increasing the request
>>> > payload size.
>>> >
>>> >
>>> > (2) true streaming RPC
>>> > IIUC, your design proposes a push-based model, whereas this FIP adopts
>>> > a pull-based model, consistent with the architecture of Fluss and
>>> > Kafka (e.g., in `FetchLog`). We chose the pull model for two key
>>> > reasons:
>>> >
>>> > *   Flow Control: In a push model, the server dictates the rate,
>>> > risking consumer overload and causing backpressure that can exhaust
>>> > server threads. The pull model allows clients to fetch data only when
>>> > they are ready, naturally preventing congestion.
>>> > *   Replayability: Handling network failures is simpler with pulling.
>>> > Clients can easily re-fetch missing data using a `page_id` or log
>>> > offset. In contrast, managing state, offsets, and varying consumer
>>> > speeds for thousands of clients in a push model adds significant
>>> > complexity, effectively turning the server into a stateful dispatcher.
>>> >
>>> > Best,
>>> > Jark
>>> >
>>> >
>>> >
>>> > On Fri, 27 Mar 2026 at 00:28, Jark Wu <[email protected]> wrote:
>>> > >
>>> > > Hi Giannis,
>>> > >
>>> > > Thanks for driving this. I only have some questions on the FIP.
>>> > >
>>> > > (1) Scan API
>>> > > The FIP says "Remove the LIMIT requirement from
>>> > > TableScan.createBatchScanner(TableBucket) for PK tables" which reuses
>>> > > the existing "Table.newScan.createBatchScan(..)" API. But the Public
>>> > > Interface section introduce a new API "Table.newKvScan.execute()".
>>> > >
>>> > > I believe extending the existing "Table.newScan().createBatchScan()"
>>> > > to support non-LIMIT queries would provide a more unified approach.
>>> > > Since this API already handles KV scan functionality, introducing a
>>> > > separate KvScan interface could confuse users regarding which method
>>> > > to choose.
>>> > >
>>> > > (2) The protobuf definition currently uses `uint32` and `uint64` for
>>> > > several fields. Since Java lacks native support for unsigned
>>> integers,
>>> > > this forces us to rely on `Integer.toUnsignedLong()` wrappers
>>> > > throughout the codebase, adding unnecessary complexity. Switching to
>>> > > `int32` and `int64` would significantly simplify the logic and
>>> improve
>>> > > code readability.
>>> > >
>>> > > (3) Configuration.
>>> > > I have a minor suggestion regarding the configuration naming. Since
>>> > > this scanner is designed exclusively for KV tables, and our existing
>>> > > server-side KV configurations consistently use the kv.* prefix, I
>>> > > recommend changing server.scanner.* to kv.scanner.*. This adjustment
>>> > > will ensure better consistency with our current naming conventions."
>>> > >
>>> > > Best,
>>> > > Jark
>>> > >
>>> > > On Thu, 26 Mar 2026 at 16:44, Lorenzo Affetti via dev
>>> > > <[email protected]> wrote:
>>> > > >
>>> > > > Ehy Giannis! Sorry for the late reply.
>>> > > >
>>> > > > *Is the ScannerManager / session state actually required?*
>>> > > >
>>> > > > *The key thing with this design is snapshot isolation across the
>>> entire
>>> > > > bucket scan.*
>>> > > > *A ScannerContext takes a RocksDB snapshot and a
>>> ResourceGuard.Lease,
>>> > and
>>> > > > all subsequent batches read from that snapshot.*
>>> > > >
>>> > > > I understand, however, with the same approach, I see easier ways to
>>> > > > implement this. For example, make the client reply not only with a
>>> > > > "last_key" but also with a "snapshot_id".
>>> > > > The server would need to store the open snapshots (this is the
>>> stateful
>>> > > > part) to re-use them upon request.
>>> > > > You can drop the iterator, but hold a pointer to the snapshot; this
>>> > should
>>> > > > tell RocksDB not to compact it out as the snapshot is not released.
>>> > > >
>>> > > > This approach would drastically reduce server-side complexity and
>>> hold
>>> > > > significantly less state.
>>> > > > I think this should be worth exploring, and if really impossible
>>> added
>>> > as a
>>> > > > rejected alternative.
>>> > > >
>>> > > >
>>> > > > *Could it be a single streaming RPC?*
>>> > > >
>>> > > > *Since the Fluss RPC layer is a custom Netty-based request/response
>>> > > > protocol, not gRPC, introducing a server-streaming primitive would
>>> > require
>>> > > > many changes and effort.*
>>> > > >
>>> > > > I understand, however, missing such a primitive still impacts
>>> Fluss,
>>> > and it
>>> > > > seems to me that we are still paying here the cost of implementing
>>> a
>>> > custom
>>> > > > streaming RPC for a special case.
>>> > > > It may be worth investing in the transport primitive once rather
>>> than
>>> > > > duplicating this pattern every time a streaming operation is
>>> needed.
>>> > > > One example is log tailing, from polling to true streaming RPC.
>>> Another
>>> > > > example is changelog streaming.
>>> > > >
>>> > > > What if we opt for streaming RPC FIP before this one so that this
>>> can
>>> > > > directly benefit from that?
>>> > > >
>>> > > >
>>> > > > On Fri, Mar 13, 2026 at 7:12 PM Giannis Polyzos <
>>> [email protected]
>>> > >
>>> > > > wrote:
>>> > > >
>>> > > > > Hi Lorenzo,
>>> > > > >
>>> > > > > And thank you for your comments. Lots of cool things to digest
>>> here,
>>> > and I
>>> > > > > would be eager to hear more people's thoughts.
>>> > > > >
>>> > > > > Regarding the API
>>> > > > > table.newScan()
>>> > > > >      .withSource(ScanSource.LIVE_KV)   // vs SNAPSHOT, LOG
>>> > > > >      .execute()
>>> > > > >
>>> > > > > Currently, the scan method is based on the table creation, so I
>>> want
>>> > to
>>> > > > > differentiate between a scan (streaming consumption of a log) and
>>> > KvScan (a
>>> > > > > bounded rocksdb full table query). Then there is also some
>>> > functionality
>>> > > > > that we might not want to expose to the user, for example about
>>> the
>>> > > > > snapshot. This should be internal functionality.
>>> > > > >
>>> > > > > *Is the ScannerManager / session state actually required?*
>>> > > > > Pagination can definitely be a simple approach. However, the key
>>> > thing
>>> > > > > with this design is snapshot isolation across the entire bucket
>>> > scan. A
>>> > > > > ScannerContext takes a RocksDB snapshot and a
>>> ResourceGuard.Lease,
>>> > and all
>>> > > > > subsequent batches read from that snapshot. This means:
>>> > > > > 1. Every key that existed at scan-open time is returned exactly
>>> once.
>>> > > > > 2. No key is duplicated because it was compacted and rewritten
>>> > between two
>>> > > > > requests.
>>> > > > > 3. No key is skipped because it was deleted and reinserted
>>> between
>>> > two
>>> > > > > requests.
>>> > > > > With cursor pagination, you lose that guarantee.
>>> > > > >
>>> > > > >
>>> > > > > *Could it be a single streaming RPC?*
>>> > > > > This would be a great approach, but my understanding is that we
>>> can't
>>> > > > > support this. Since the Fluss RPC layer is a custom Netty-based
>>> > > > > request/response protocol, not gRPC, introducing a
>>> server-streaming
>>> > > > > primitive would require many changes and effort. Let me know if
>>> I'm
>>> > > > > mistaken.
>>> > > > >
>>> > > > > Best,
>>> > > > > Giannis
>>> > > > >
>>> > > > >
>>> > > > > On Fri, Mar 13, 2026 at 2:34 PM Lorenzo Affetti via dev <
>>> > > > > [email protected]> wrote:
>>> > > > >
>>> > > > >> Hello Giannis!
>>> > > > >> Very instructive read, I went through that and I was actually
>>> > astonished
>>> > > > >> that this was not the "default" mode that Fluss operates on PK
>>> > tables when
>>> > > > >> a snapshot is requested.
>>> > > > >> Very good that we have this initiative.
>>> > > > >>
>>> > > > >> *Is a new scanner type explicitly required?*
>>> > > > >>
>>> > > > >> I don't think it needs to be addressed now, but a good thing to
>>> > keep in
>>> > > > >> mind after this FIP gets implemented.
>>> > > > >>
>>> > > > >>
>>> > > > >> The cleaner design would arguably be to unify these under a
>>> single
>>> > > > >> abstraction where the scan source (live RocksDB vs. remote
>>> > snapshot) and
>>> > > > >> chunking behavior are implementation details, not separate
>>> scanner
>>> > > > >> classes.
>>> > > > >> The KvScan interface being introduced (table.newKvScan()) is a
>>> step
>>> > in
>>> > > > >> that
>>> > > > >> direction but it still sits alongside rather than replacing the
>>> > existing
>>> > > > >> scan path.
>>> > > > >>
>>> > > > >> A truly unified design might look like:
>>> > > > >>
>>> > > > >> table.newScan()
>>> > > > >>      .withSource(ScanSource.LIVE_KV)   // vs SNAPSHOT, LOG
>>> > > > >>      .execute()
>>> > > > >>
>>> > > > >> I hope we keep this in mind for future, relevant situations.
>>> > > > >>
>>> > > > >> ---------
>>> > > > >>
>>> > > > >> *Is the ScannerManager / session state actually required?*
>>> > > > >>
>>> > > > >> The session exists because of the chunked, multi-RPC design --
>>> the
>>> > server
>>> > > > >> needs to remember where the iterator is between calls.
>>> > > > >> But is that statefulness necessary at all?
>>> > > > >>
>>> > > > >> The stateless alternative: cursor-based pagination
>>> > > > >>
>>> > > > >> Instead of keeping a server-side iterator session, you could do
>>> > stateless
>>> > > > >> resumption using a last-seen key as a cursor:
>>> > > > >>
>>> > > > >> Request 1: scan from beginning   -> returns rows +
>>> > last_key="key_0500"
>>> > > > >> Request 2: scan from "key_0500"  -> returns rows +
>>> > last_key="key_1000"
>>> > > > >> Request 3: scan from "key_1000"  -> ...
>>> > > > >>
>>> > > > >> Each request is fully independent. The server opens a fresh
>>> RocksDB
>>> > > > >> iterator, seeks to the cursor key, reads a batch, and closes
>>> it. No
>>> > > > >> session, no TTL, no ScannerManager.
>>> > > > >>
>>> > > > >> Advantages:
>>> > > > >>
>>> > > > >>    - Massively simpler server side -- no session lifecycle, no
>>> TTL
>>> > reaper,
>>> > > > >>    no leadership fencing complexity
>>> > > > >>    - Naturally resilient to server restarts and leadership
>>> changes
>>> > --
>>> > > > >>    client just retries from the last cursor
>>> > > > >>    - No SCANNER_EXPIRED / UNKNOWN_SCANNER_ID error classes
>>> needed
>>> > > > >>
>>> > > > >> Tradeoffs:
>>> > > > >>
>>> > > > >>    - *Consistency weakens slightly* -- each batch opens a fresh
>>> > RocksDB
>>> > > > >>    snapshot, so you might see a key move between batches if it
>>> was
>>> > updated
>>> > > > >>    between requests. With the session approach, the entire
>>> bucket
>>> > scan is
>>> > > > >>    under one snapshot.
>>> > > > >>    - Seek cost -- RocksDB iterator seeks are not free,
>>> especially
>>> > across
>>> > > > >>    many SST files. For very large tables with many chunks this
>>> adds
>>> > up,
>>> > > > >> though
>>> > > > >>    for the small key spaces FIP-17 targets it's likely
>>> negligible.
>>> > > > >>    - Cursor encoding needs care for binary keys.
>>> > > > >>
>>> > > > >> Could it be a single streaming RPC?
>>> > > > >>
>>> > > > >> Rather than a request/response sequence with session state,
>>> you'd
>>> > have a
>>> > > > >> single server-streaming RPC:
>>> > > > >>
>>> > > > >> client -> ScanKvRequest (open)
>>> > > > >> server -> stream of ScanKvResponse chunks
>>> > > > >> server -> closes stream when exhausted
>>> > > > >>
>>> > > > >> If this is possible, the entire ScannerManager session
>>> complexity
>>> > > > >> evaporates -- the iterator just lives for the duration of the
>>> > stream, held
>>> > > > >> naturally by the connection.
>>> > > > >>
>>> > > > >>
>>> > > > >>
>>> > > > >> On Tue, Mar 10, 2026 at 9:59 AM Keith Lee <
>>> > [email protected]>
>>> > > > >> wrote:
>>> > > > >>
>>> > > > >> > Hello Giannis,
>>> > > > >> >
>>> > > > >> > Thank you for the update to the proposal! Quickly skimmed
>>> through
>>> > and I
>>> > > > >> > like the updates that you’ve made! Questions / comments below:
>>> > > > >> >
>>> > > > >> > 1. You mentioned an extra section on heartbeat on the FIP,
>>> but I
>>> > do not
>>> > > > >> see
>>> > > > >> > heartbeat being mentioned on the latest version of the FIP?
>>> +1
>>> > If the
>>> > > > >> > proposal is updated to rely solely on last scan for TTL and
>>> remove
>>> > > > >> > heartbeat, it’s a great change. If I remember correctly, the
>>> > previous
>>> > > > >> > version was to use heartbeat as keepalive, there is a risk of
>>> > unclosed,
>>> > > > >> > idle scanner holding resources on server side indefinitely and
>>> > causing
>>> > > > >> > leak.
>>> > > > >> >
>>> > > > >> > 2. On continuation request, should we check lastAccessTimeMs
>>> and
>>> > reject
>>> > > > >> if
>>> > > > >> > elapsed time is larger than TTL? Otherwise sessions can idle
>>> > between 60
>>> > > > >> and
>>> > > > >> > 90 (TTL+ reaper interval). This might be exacerbated if user
>>> > configure
>>> > > > >> > particularly high TTL and reaper interval.
>>> > > > >> >
>>> > > > >> > 3. On SCANNER_EXPIRED, is it necessary to have a separate
>>> error
>>> > for
>>> > > > >> expired
>>> > > > >> > scanner? We can have a single UNKNOWN_OR_EXPIRED_SCANNER
>>> (renaming
>>> > > > >> > UNKNOWN_SCANNER_ID). These are both terminal and non
>>> retriable, I
>>> > > > >> imagine
>>> > > > >> > that handling it from client side would not differ. It’s also
>>> a
>>> > small
>>> > > > >> > simplification to the implementation.
>>> > > > >> >
>>> > > > >> > 4. On pipelining. If the user queries for top-n every 10
>>> seconds
>>> > to
>>> > > > >> update
>>> > > > >> > leaderboard, would pipelining cause higher unnecessary
>>> traffic?
>>> > E.g.
>>> > > > >> they
>>> > > > >> > only care about n records but pipelining automatically fetch
>>> up
>>> > to 8mb.
>>> > > > >> >
>>> > > > >> > 5. Also on pipelining, while it seems that we’re keeping Flink
>>> > connector
>>> > > > >> > out of scope, IIRC Flink split fetcher also pipelines. If we
>>> use
>>> > this to
>>> > > > >> > update Flink connector, we’d have higher amount buffered in
>>> > pipeline.
>>> > > > >> >
>>> > > > >> > 6. On expiration interval, should we hide that configuration
>>> and
>>> > choose
>>> > > > >> to
>>> > > > >> > expose it if there’s a strong need for it? It’s fewer config
>>> for
>>> > users
>>> > > > >> to
>>> > > > >> > reason about and 30s expiration sounds like a good starting
>>> point.
>>> > > > >> >
>>> > > > >> > Best regards
>>> > > > >> >
>>> > > > >> > Keith Lee
>>> > > > >> >
>>> > > > >> >
>>> > > > >> > On Tue, 10 Mar 2026 at 08:49, Giannis Polyzos <
>>> > [email protected]>
>>> > > > >> > wrote:
>>> > > > >> >
>>> > > > >> > > Hi devs,
>>> > > > >> > > Let me know if there are any comments here, otherwise I
>>> would
>>> > like to
>>> > > > >> > start
>>> > > > >> > > a vote thread.
>>> > > > >> > >
>>> > > > >> > > Best,
>>> > > > >> > > Giannis
>>> > > > >> > >
>>> > > > >> > > On Thu, 5 Mar 2026 at 3:38 PM, Giannis Polyzos <
>>> > [email protected]
>>> > > > >> >
>>> > > > >> > > wrote:
>>> > > > >> > >
>>> > > > >> > > > Hi devs,
>>> > > > >> > > >
>>> > > > >> > > > After a long time, i will like to reinitiate the
>>> discussions
>>> > on
>>> > > > >> FIP-17.
>>> > > > >> > > >
>>> > > > >> > > > I made quite a few updates on the FIP, which you can find
>>> > here:
>>> > > > >> > > >
>>> > > > >> > > >
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> >
>>> https://cwiki.apache.org/confluence/display/FLUSS/FIP-17+Primary+Key+Table+Snapshot+Queries
>>> > > > >> > > > and updated the title to better reflect the goal. Let me
>>> know
>>> > if it
>>> > > > >> > makes
>>> > > > >> > > > sense.
>>> > > > >> > > >
>>> > > > >> > > > Moreover in the end of the proposal, you will find a
>>> section
>>> > as
>>> > > > >> *extras
>>> > > > >> > > *which
>>> > > > >> > > > has a suggestion for a heartbeat mechanism. However,
>>> during
>>> > my PoC,
>>> > > > >> I
>>> > > > >> > > found
>>> > > > >> > > > that this is not really needed, but
>>> > > > >> > > > I would like your thoughts and feedback first.
>>> > > > >> > > >
>>> > > > >> > > > Best,
>>> > > > >> > > > Giannis
>>> > > > >> > > >
>>> > > > >> > > > On Wed, Oct 29, 2025 at 2:45 PM Giannis Polyzos <
>>> > > > >> [email protected]
>>> > > > >> > >
>>> > > > >> > > > wrote:
>>> > > > >> > > >
>>> > > > >> > > >> Yang, thank you for your thoughtful comments.
>>> > > > >> > > >>
>>> > > > >> > > >> Indeed, we are streaming the results to the client;
>>> however,
>>> > it's
>>> > > > >> > still
>>> > > > >> > > a
>>> > > > >> > > >> batch operation. We could use "KV store (or PK table)
>>> > Snapshot
>>> > > > >> Query"
>>> > > > >> > > or
>>> > > > >> > > >> something similar, since we are querying a RocksDB
>>> snapshot.
>>> > WDYT?
>>> > > > >> > > >> The newly introduced KvBatchScanner should be able to be
>>> > reused
>>> > > > >> from
>>> > > > >> > > both
>>> > > > >> > > >> the client itself - assume a scenario that I want to
>>> > periodically
>>> > > > >> > query
>>> > > > >> > > the
>>> > > > >> > > >> full RocksDB KV store to power real-time dashboards - as
>>> > well as
>>> > > > >> Flink
>>> > > > >> > > >> (with more engines to follow later).
>>> > > > >> > > >> It issues requests to fetch the results per bucket and
>>> > transmit
>>> > > > >> them
>>> > > > >> > > back
>>> > > > >> > > >> to the client.
>>> > > > >> > > >>
>>> > > > >> > > >> > Could you elaborate on why the new KvBatchScanner isn't
>>> > reusable?
>>> > > > >> > > >> I think the reasoning here is that reach requests create
>>> a
>>> > new
>>> > > > >> > > >> KvBatchScanner, which polls the records and then closes
>>> > > > >> automatically.
>>> > > > >> > > Any
>>> > > > >> > > >> reason you see this as a limitation, and we should
>>> consider
>>> > making
>>> > > > >> it
>>> > > > >> > > >> reusable?
>>> > > > >> > > >>
>>> > > > >> > > >> The design aims mainly for the Fluss client API.. Should
>>> we
>>> > add an
>>> > > > >> > > >> integration design with Flink? Wang Cheng, WDYT?
>>> > > > >> > > >>
>>> > > > >> > > >> Best,
>>> > > > >> > > >> Giannis
>>> > > > >> > > >>
>>> > > > >> > > >>
>>> > > > >> > > >>
>>> > > > >> > > >> On Tue, Oct 28, 2025 at 4:44 AM Yang Wang <
>>> > > > >> [email protected]>
>>> > > > >> > > >> wrote:
>>> > > > >> > > >>
>>> > > > >> > > >>> Hi Cheng,
>>> > > > >> > > >>>
>>> > > > >> > > >>> Thank you for driving this excellent work! Your FIP
>>> > document shows
>>> > > > >> > > great
>>> > > > >> > > >>> thought and initiative. I've gone through it and have
>>> some
>>> > > > >> questions
>>> > > > >> > > and
>>> > > > >> > > >>> suggestions that I hope can further enhance this
>>> valuable
>>> > > > >> > contribution.
>>> > > > >> > > >>>
>>> > > > >> > > >>> 1、Regarding the Title, I believe we could consider
>>> changing
>>> > it to
>>> > > > >> > > >>> "Support
>>> > > > >> > > >>> full scan in batch mode for PrimaryKey Table". The term
>>> > > > >> "Streaming"
>>> > > > >> > > might
>>> > > > >> > > >>> cause confusion with Flink's streaming/batch modes, and
>>> this
>>> > > > >> revised
>>> > > > >> > > >>> title
>>> > > > >> > > >>> would provide better clarity.
>>> > > > >> > > >>>
>>> > > > >> > > >>> 2、In the Motivation section, I think there are two
>>> > particularly
>>> > > > >> > > important
>>> > > > >> > > >>> benefits worth highlighting: (1) OLAP engines will be
>>> able
>>> > to
>>> > > > >> perform
>>> > > > >> > > >>> full
>>> > > > >> > > >>> snapshot reads on Fluss primary-key tables. (2) This
>>> > approach can
>>> > > > >> > > replace
>>> > > > >> > > >>> the current KvSnapshotBatchScanner, allowing the Fluss
>>> > client to
>>> > > > >> > > >>> eliminate
>>> > > > >> > > >>> its RocksDB dependency entirely.
>>> > > > >> > > >>>
>>> > > > >> > > >>> 3、Concerning the Proposed Changes, could you clarify
>>> when
>>> > exactly
>>> > > > >> the
>>> > > > >> > > >>> client creates a KV snapshot on the server side, and
>>> when
>>> > we send
>>> > > > >> the
>>> > > > >> > > >>> bucket_scan_req?
>>> > > > >> > > >>>
>>> > > > >> > > >>> Let me share my thinking on this: When Flink attempts to
>>> > read
>>> > > > >> from a
>>> > > > >> > > >>> PrimaryKey table, the FlinkSourceEnumerator in the
>>> JobMaster
>>> > > > >> > generates
>>> > > > >> > > >>> HybridSnapshotLogSplit and dispatches them to
>>> SplitReaders
>>> > > > >> running on
>>> > > > >> > > the
>>> > > > >> > > >>> TaskManager. The JobMaster doesn't actually read data—it
>>> > merely
>>> > > > >> > defines
>>> > > > >> > > >>> and
>>> > > > >> > > >>> manages the splits. Therefore, we need to ensure the JM
>>> has
>>> > > > >> > sufficient
>>> > > > >> > > >>> information to determine the boundary of the KV snapshot
>>> > and the
>>> > > > >> > > >>> startOffset of the LogSplit.
>>> > > > >> > > >>>
>>> > > > >> > > >>> I suggest we explicitly create a snapshot (or as you've
>>> > termed
>>> > > > >> it, a
>>> > > > >> > > >>> new_scan_request) on the server side. This way, the
>>> > > > >> > > FlinkSourceEnumerator
>>> > > > >> > > >>> can use it to define a HybridSnapshotLogSplit, and the
>>> > > > >> SplitReaders
>>> > > > >> > can
>>> > > > >> > > >>> perform pollBatch operations on this snapshot (which
>>> would
>>> > be
>>> > > > >> bound
>>> > > > >> > to
>>> > > > >> > > >>> the
>>> > > > >> > > >>> specified scanner_id).
>>> > > > >> > > >>>
>>> > > > >> > > >>> 4、 Could you elaborate on why the new KvBatchScanner
>>> isn't
>>> > > > >> reusable?
>>> > > > >> > > >>> What's
>>> > > > >> > > >>> the reasoning behind this limitation? (I believe RocksDB
>>> > > > >> iterators do
>>> > > > >> > > >>> support the seekToFirst operation.) If a TaskManager
>>> fails
>>> > over
>>> > > > >> > before
>>> > > > >> > > a
>>> > > > >> > > >>> checkpoint, rescanning an existing snapshot seems like a
>>> > natural
>>> > > > >> > > >>> requirement.
>>> > > > >> > > >>>
>>> > > > >> > > >>> 5、I think it would be beneficial to include some
>>> detailed
>>> > design
>>> > > > >> > > aspects
>>> > > > >> > > >>> regarding Flink's integration with the new BatchScanner.
>>> > > > >> > > >>>
>>> > > > >> > > >>> Overall, this is a solid foundation for an important
>>> > enhancement.
>>> > > > >> > > Looking
>>> > > > >> > > >>> forward to discussing these points further!
>>> > > > >> > > >>>
>>> > > > >> > > >>> Best regards, Yang
>>> > > > >> > > >>>
>>> > > > >> > > >>> Wang Cheng <[email protected]> 于2025年10月22日周三
>>> > 17:09写道:
>>> > > > >> > > >>>
>>> > > > >> > > >>> > Hi all,
>>> > > > >> > > >>> >
>>> > > > >> > > >>> >
>>> > > > >> > > >>> > As of v0.8, Fluss only supports KV snapshot batch
>>> scan and
>>> > > > >> limit KV
>>> > > > >> > > >>> batch
>>> > > > >> > > >>> > scan. The former approach is constrained by snapshot
>>> > > > >> availability
>>> > > > >> > and
>>> > > > >> > > >>> > remote storage performance, while the later one is
>>> only
>>> > > > >> applicable
>>> > > > >> > to
>>> > > > >> > > >>> > queries with LIMIT clause and risks high memory
>>> pressure.
>>> > > > >> > > >>> >
>>> > > > >> > > >>> >
>>> > > > >> > > >>> > To address those limitations, Giannis Polyzos and I
>>> are
>>> > writing
>>> > > > >> to
>>> > > > >> > > >>> propose
>>> > > > >> > > >>> > FIP-17: a general-purpose streaming KV scan for Fluss
>>> [1].
>>> > > > >> > > >>> >
>>> > > > >> > > >>> >
>>> > > > >> > > >>> > Any feedback and suggestions on this proposal are
>>> welcome!
>>> > > > >> > > >>> >
>>> > > > >> > > >>> >
>>> > > > >> > > >>> > [1]:
>>> > > > >> > > >>> >
>>> > > > >> > > >>>
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> >
>>> https://cwiki.apache.org/confluence/display/FLUSS/FIP-17+Streaming+KV+Scan+RPC
>>> > > > >> > > >>> >
>>> > > > >> > > >>> > Regards,
>>> > > > >> > > >>> > Cheng
>>> > > > >> > > >>> >
>>> > > > >> > > >>> >
>>> > > > >> > > >>> >
>>> > > > >> > > >>> > &nbsp;
>>> > > > >> > > >>>
>>> > > > >> > > >>
>>> > > > >> > >
>>> > > > >> >
>>> > > > >>
>>> > > > >>
>>> > > > >> --
>>> > > > >> Lorenzo Affetti
>>> > > > >> Senior Software Engineer @ Flink Team
>>> > > > >> Ververica <http://www.ververica.com>
>>> > > > >>
>>> > > > >
>>> > > >
>>> > > > --
>>> > > > Lorenzo Affetti
>>> > > > Senior Software Engineer @ Flink Team
>>> > > > Ververica <http://www.ververica.com>
>>> >
>>>
>>
>
> --
> Lorenzo Affetti
> Senior Software Engineer @ Flink Team
> Ververica <http://www.ververica.com>
>

Reply via email to