Hi Lorenzo,

And thank you for your comments. Lots of cool things to digest here, and I
would be eager to hear more people's thoughts.

Regarding the API
table.newScan()
     .withSource(ScanSource.LIVE_KV)   // vs SNAPSHOT, LOG
     .execute()

Currently, the scan method is based on the table creation, so I want to
differentiate between a scan (streaming consumption of a log) and KvScan (a
bounded rocksdb full table query). Then there is also some functionality
that we might not want to expose to the user, for example about the
snapshot. This should be internal functionality.

*Is the ScannerManager / session state actually required?*
Pagination can definitely be a simple approach. However, the key thing with
this design is snapshot isolation across the entire bucket scan. A
ScannerContext takes a RocksDB snapshot and a ResourceGuard.Lease, and all
subsequent batches read from that snapshot. This means:
1. Every key that existed at scan-open time is returned exactly once.
2. No key is duplicated because it was compacted and rewritten between two
requests.
3. No key is skipped because it was deleted and reinserted between two
requests.
With cursor pagination, you lose that guarantee.


*Could it be a single streaming RPC?*
This would be a great approach, but my understanding is that we can't
support this. Since the Fluss RPC layer is a custom Netty-based
request/response protocol, not gRPC, introducing a server-streaming
primitive would require many changes and effort. Let me know if I'm
mistaken.

Best,
Giannis


On Fri, Mar 13, 2026 at 2:34 PM Lorenzo Affetti via dev <
[email protected]> wrote:

> Hello Giannis!
> Very instructive read, I went through that and I was actually astonished
> that this was not the "default" mode that Fluss operates on PK tables when
> a snapshot is requested.
> Very good that we have this initiative.
>
> *Is a new scanner type explicitly required?*
>
> I don't think it needs to be addressed now, but a good thing to keep in
> mind after this FIP gets implemented.
>
>
> The cleaner design would arguably be to unify these under a single
> abstraction where the scan source (live RocksDB vs. remote snapshot) and
> chunking behavior are implementation details, not separate scanner classes.
> The KvScan interface being introduced (table.newKvScan()) is a step in that
> direction but it still sits alongside rather than replacing the existing
> scan path.
>
> A truly unified design might look like:
>
> table.newScan()
>      .withSource(ScanSource.LIVE_KV)   // vs SNAPSHOT, LOG
>      .execute()
>
> I hope we keep this in mind for future, relevant situations.
>
> ---------
>
> *Is the ScannerManager / session state actually required?*
>
> The session exists because of the chunked, multi-RPC design -- the server
> needs to remember where the iterator is between calls.
> But is that statefulness necessary at all?
>
> The stateless alternative: cursor-based pagination
>
> Instead of keeping a server-side iterator session, you could do stateless
> resumption using a last-seen key as a cursor:
>
> Request 1: scan from beginning   -> returns rows + last_key="key_0500"
> Request 2: scan from "key_0500"  -> returns rows + last_key="key_1000"
> Request 3: scan from "key_1000"  -> ...
>
> Each request is fully independent. The server opens a fresh RocksDB
> iterator, seeks to the cursor key, reads a batch, and closes it. No
> session, no TTL, no ScannerManager.
>
> Advantages:
>
>    - Massively simpler server side -- no session lifecycle, no TTL reaper,
>    no leadership fencing complexity
>    - Naturally resilient to server restarts and leadership changes --
>    client just retries from the last cursor
>    - No SCANNER_EXPIRED / UNKNOWN_SCANNER_ID error classes needed
>
> Tradeoffs:
>
>    - *Consistency weakens slightly* -- each batch opens a fresh RocksDB
>    snapshot, so you might see a key move between batches if it was updated
>    between requests. With the session approach, the entire bucket scan is
>    under one snapshot.
>    - Seek cost -- RocksDB iterator seeks are not free, especially across
>    many SST files. For very large tables with many chunks this adds up,
> though
>    for the small key spaces FIP-17 targets it's likely negligible.
>    - Cursor encoding needs care for binary keys.
>
> Could it be a single streaming RPC?
>
> Rather than a request/response sequence with session state, you'd have a
> single server-streaming RPC:
>
> client -> ScanKvRequest (open)
> server -> stream of ScanKvResponse chunks
> server -> closes stream when exhausted
>
> If this is possible, the entire ScannerManager session complexity
> evaporates -- the iterator just lives for the duration of the stream, held
> naturally by the connection.
>
>
>
> On Tue, Mar 10, 2026 at 9:59 AM Keith Lee <[email protected]>
> wrote:
>
> > Hello Giannis,
> >
> > Thank you for the update to the proposal! Quickly skimmed through and I
> > like the updates that you’ve made! Questions / comments below:
> >
> > 1. You mentioned an extra section on heartbeat on the FIP, but I do not
> see
> > heartbeat being mentioned on the latest version of the FIP?  +1 If the
> > proposal is updated to rely solely on last scan for TTL and remove
> > heartbeat, it’s a great change. If I remember correctly, the previous
> > version was to use heartbeat as keepalive, there is a risk of unclosed,
> > idle scanner holding resources on server side indefinitely and causing
> > leak.
> >
> > 2. On continuation request, should we check lastAccessTimeMs and reject
> if
> > elapsed time is larger than TTL? Otherwise sessions can idle between 60
> and
> > 90 (TTL+ reaper interval). This might be exacerbated if user configure
> > particularly high TTL and reaper interval.
> >
> > 3. On SCANNER_EXPIRED, is it necessary to have a separate error for
> expired
> > scanner? We can have a single UNKNOWN_OR_EXPIRED_SCANNER (renaming
> > UNKNOWN_SCANNER_ID). These are both terminal and non retriable, I imagine
> > that handling it from client side would not differ. It’s also a small
> > simplification to the implementation.
> >
> > 4. On pipelining. If the user queries for top-n every 10 seconds to
> update
> > leaderboard, would pipelining cause higher unnecessary traffic? E.g. they
> > only care about n records but pipelining automatically fetch up to 8mb.
> >
> > 5. Also on pipelining, while it seems that we’re keeping Flink connector
> > out of scope, IIRC Flink split fetcher also pipelines. If we use this to
> > update Flink connector, we’d have higher amount buffered in pipeline.
> >
> > 6. On expiration interval, should we hide that configuration and choose
> to
> > expose it if there’s a strong need for it? It’s fewer config for users to
> > reason about and 30s expiration sounds like a good starting point.
> >
> > Best regards
> >
> > Keith Lee
> >
> >
> > On Tue, 10 Mar 2026 at 08:49, Giannis Polyzos <[email protected]>
> > wrote:
> >
> > > Hi devs,
> > > Let me know if there are any comments here, otherwise I would like to
> > start
> > > a vote thread.
> > >
> > > Best,
> > > Giannis
> > >
> > > On Thu, 5 Mar 2026 at 3:38 PM, Giannis Polyzos <[email protected]>
> > > wrote:
> > >
> > > > Hi devs,
> > > >
> > > > After a long time, i will like to reinitiate the discussions on
> FIP-17.
> > > >
> > > > I made quite a few updates on the FIP, which you can find here:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLUSS/FIP-17+Primary+Key+Table+Snapshot+Queries
> > > > and updated the title to better reflect the goal. Let me know if it
> > makes
> > > > sense.
> > > >
> > > > Moreover in the end of the proposal, you will find a section as
> *extras
> > > *which
> > > > has a suggestion for a heartbeat mechanism. However, during my PoC, I
> > > found
> > > > that this is not really needed, but
> > > > I would like your thoughts and feedback first.
> > > >
> > > > Best,
> > > > Giannis
> > > >
> > > > On Wed, Oct 29, 2025 at 2:45 PM Giannis Polyzos <
> [email protected]
> > >
> > > > wrote:
> > > >
> > > >> Yang, thank you for your thoughtful comments.
> > > >>
> > > >> Indeed, we are streaming the results to the client; however, it's
> > still
> > > a
> > > >> batch operation. We could use "KV store (or PK table) Snapshot
> Query"
> > > or
> > > >> something similar, since we are querying a RocksDB snapshot. WDYT?
> > > >> The newly introduced KvBatchScanner should be able to be reused from
> > > both
> > > >> the client itself - assume a scenario that I want to periodically
> > query
> > > the
> > > >> full RocksDB KV store to power real-time dashboards - as well as
> Flink
> > > >> (with more engines to follow later).
> > > >> It issues requests to fetch the results per bucket and transmit them
> > > back
> > > >> to the client.
> > > >>
> > > >> > Could you elaborate on why the new KvBatchScanner isn't reusable?
> > > >> I think the reasoning here is that reach requests create a new
> > > >> KvBatchScanner, which polls the records and then closes
> automatically.
> > > Any
> > > >> reason you see this as a limitation, and we should consider making
> it
> > > >> reusable?
> > > >>
> > > >> The design aims mainly for the Fluss client API.. Should we add an
> > > >> integration design with Flink? Wang Cheng, WDYT?
> > > >>
> > > >> Best,
> > > >> Giannis
> > > >>
> > > >>
> > > >>
> > > >> On Tue, Oct 28, 2025 at 4:44 AM Yang Wang <
> [email protected]>
> > > >> wrote:
> > > >>
> > > >>> Hi Cheng,
> > > >>>
> > > >>> Thank you for driving this excellent work! Your FIP document shows
> > > great
> > > >>> thought and initiative. I've gone through it and have some
> questions
> > > and
> > > >>> suggestions that I hope can further enhance this valuable
> > contribution.
> > > >>>
> > > >>> 1、Regarding the Title, I believe we could consider changing it to
> > > >>> "Support
> > > >>> full scan in batch mode for PrimaryKey Table". The term "Streaming"
> > > might
> > > >>> cause confusion with Flink's streaming/batch modes, and this
> revised
> > > >>> title
> > > >>> would provide better clarity.
> > > >>>
> > > >>> 2、In the Motivation section, I think there are two particularly
> > > important
> > > >>> benefits worth highlighting: (1) OLAP engines will be able to
> perform
> > > >>> full
> > > >>> snapshot reads on Fluss primary-key tables. (2) This approach can
> > > replace
> > > >>> the current KvSnapshotBatchScanner, allowing the Fluss client to
> > > >>> eliminate
> > > >>> its RocksDB dependency entirely.
> > > >>>
> > > >>> 3、Concerning the Proposed Changes, could you clarify when exactly
> the
> > > >>> client creates a KV snapshot on the server side, and when we send
> the
> > > >>> bucket_scan_req?
> > > >>>
> > > >>> Let me share my thinking on this: When Flink attempts to read from
> a
> > > >>> PrimaryKey table, the FlinkSourceEnumerator in the JobMaster
> > generates
> > > >>> HybridSnapshotLogSplit and dispatches them to SplitReaders running
> on
> > > the
> > > >>> TaskManager. The JobMaster doesn't actually read data—it merely
> > defines
> > > >>> and
> > > >>> manages the splits. Therefore, we need to ensure the JM has
> > sufficient
> > > >>> information to determine the boundary of the KV snapshot and the
> > > >>> startOffset of the LogSplit.
> > > >>>
> > > >>> I suggest we explicitly create a snapshot (or as you've termed it,
> a
> > > >>> new_scan_request) on the server side. This way, the
> > > FlinkSourceEnumerator
> > > >>> can use it to define a HybridSnapshotLogSplit, and the SplitReaders
> > can
> > > >>> perform pollBatch operations on this snapshot (which would be bound
> > to
> > > >>> the
> > > >>> specified scanner_id).
> > > >>>
> > > >>> 4、 Could you elaborate on why the new KvBatchScanner isn't
> reusable?
> > > >>> What's
> > > >>> the reasoning behind this limitation? (I believe RocksDB iterators
> do
> > > >>> support the seekToFirst operation.) If a TaskManager fails over
> > before
> > > a
> > > >>> checkpoint, rescanning an existing snapshot seems like a natural
> > > >>> requirement.
> > > >>>
> > > >>> 5、I think it would be beneficial to include some detailed design
> > > aspects
> > > >>> regarding Flink's integration with the new BatchScanner.
> > > >>>
> > > >>> Overall, this is a solid foundation for an important enhancement.
> > > Looking
> > > >>> forward to discussing these points further!
> > > >>>
> > > >>> Best regards, Yang
> > > >>>
> > > >>> Wang Cheng <[email protected]> 于2025年10月22日周三 17:09写道:
> > > >>>
> > > >>> > Hi all,
> > > >>> >
> > > >>> >
> > > >>> > As of v0.8, Fluss only supports KV snapshot batch scan and limit
> KV
> > > >>> batch
> > > >>> > scan. The former approach is constrained by snapshot availability
> > and
> > > >>> > remote storage performance, while the later one is only
> applicable
> > to
> > > >>> > queries with LIMIT clause and risks high memory pressure.
> > > >>> >
> > > >>> >
> > > >>> > To address those limitations, Giannis Polyzos and I are writing
> to
> > > >>> propose
> > > >>> > FIP-17: a general-purpose streaming KV scan for Fluss [1].
> > > >>> >
> > > >>> >
> > > >>> > Any feedback and suggestions on this proposal are welcome!
> > > >>> >
> > > >>> >
> > > >>> > [1]:
> > > >>> >
> > > >>>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLUSS/FIP-17+Streaming+KV+Scan+RPC
> > > >>> >
> > > >>> > Regards,
> > > >>> > Cheng
> > > >>> >
> > > >>> >
> > > >>> >
> > > >>> > &nbsp;
> > > >>>
> > > >>
> > >
> >
>
>
> --
> Lorenzo Affetti
> Senior Software Engineer @ Flink Team
> Ververica <http://www.ververica.com>
>

Reply via email to