Hello Giannis!
Very instructive read, I went through that and I was actually astonished
that this was not the "default" mode that Fluss operates on PK tables when
a snapshot is requested.
Very good that we have this initiative.
*Is a new scanner type explicitly required?*
I don't think it needs to be addressed now, but a good thing to keep in
mind after this FIP gets implemented.
The cleaner design would arguably be to unify these under a single
abstraction where the scan source (live RocksDB vs. remote snapshot) and
chunking behavior are implementation details, not separate scanner classes.
The KvScan interface being introduced (table.newKvScan()) is a step in that
direction but it still sits alongside rather than replacing the existing
scan path.
A truly unified design might look like:
table.newScan()
.withSource(ScanSource.LIVE_KV) // vs SNAPSHOT, LOG
.execute()
I hope we keep this in mind for future, relevant situations.
---------
*Is the ScannerManager / session state actually required?*
The session exists because of the chunked, multi-RPC design -- the server
needs to remember where the iterator is between calls.
But is that statefulness necessary at all?
The stateless alternative: cursor-based pagination
Instead of keeping a server-side iterator session, you could do stateless
resumption using a last-seen key as a cursor:
Request 1: scan from beginning -> returns rows + last_key="key_0500"
Request 2: scan from "key_0500" -> returns rows + last_key="key_1000"
Request 3: scan from "key_1000" -> ...
Each request is fully independent. The server opens a fresh RocksDB
iterator, seeks to the cursor key, reads a batch, and closes it. No
session, no TTL, no ScannerManager.
Advantages:
- Massively simpler server side -- no session lifecycle, no TTL reaper,
no leadership fencing complexity
- Naturally resilient to server restarts and leadership changes --
client just retries from the last cursor
- No SCANNER_EXPIRED / UNKNOWN_SCANNER_ID error classes needed
Tradeoffs:
- *Consistency weakens slightly* -- each batch opens a fresh RocksDB
snapshot, so you might see a key move between batches if it was updated
between requests. With the session approach, the entire bucket scan is
under one snapshot.
- Seek cost -- RocksDB iterator seeks are not free, especially across
many SST files. For very large tables with many chunks this adds up, though
for the small key spaces FIP-17 targets it's likely negligible.
- Cursor encoding needs care for binary keys.
Could it be a single streaming RPC?
Rather than a request/response sequence with session state, you'd have a
single server-streaming RPC:
client -> ScanKvRequest (open)
server -> stream of ScanKvResponse chunks
server -> closes stream when exhausted
If this is possible, the entire ScannerManager session complexity
evaporates -- the iterator just lives for the duration of the stream, held
naturally by the connection.
On Tue, Mar 10, 2026 at 9:59 AM Keith Lee <[email protected]>
wrote:
> Hello Giannis,
>
> Thank you for the update to the proposal! Quickly skimmed through and I
> like the updates that you’ve made! Questions / comments below:
>
> 1. You mentioned an extra section on heartbeat on the FIP, but I do not see
> heartbeat being mentioned on the latest version of the FIP? +1 If the
> proposal is updated to rely solely on last scan for TTL and remove
> heartbeat, it’s a great change. If I remember correctly, the previous
> version was to use heartbeat as keepalive, there is a risk of unclosed,
> idle scanner holding resources on server side indefinitely and causing
> leak.
>
> 2. On continuation request, should we check lastAccessTimeMs and reject if
> elapsed time is larger than TTL? Otherwise sessions can idle between 60 and
> 90 (TTL+ reaper interval). This might be exacerbated if user configure
> particularly high TTL and reaper interval.
>
> 3. On SCANNER_EXPIRED, is it necessary to have a separate error for expired
> scanner? We can have a single UNKNOWN_OR_EXPIRED_SCANNER (renaming
> UNKNOWN_SCANNER_ID). These are both terminal and non retriable, I imagine
> that handling it from client side would not differ. It’s also a small
> simplification to the implementation.
>
> 4. On pipelining. If the user queries for top-n every 10 seconds to update
> leaderboard, would pipelining cause higher unnecessary traffic? E.g. they
> only care about n records but pipelining automatically fetch up to 8mb.
>
> 5. Also on pipelining, while it seems that we’re keeping Flink connector
> out of scope, IIRC Flink split fetcher also pipelines. If we use this to
> update Flink connector, we’d have higher amount buffered in pipeline.
>
> 6. On expiration interval, should we hide that configuration and choose to
> expose it if there’s a strong need for it? It’s fewer config for users to
> reason about and 30s expiration sounds like a good starting point.
>
> Best regards
>
> Keith Lee
>
>
> On Tue, 10 Mar 2026 at 08:49, Giannis Polyzos <[email protected]>
> wrote:
>
> > Hi devs,
> > Let me know if there are any comments here, otherwise I would like to
> start
> > a vote thread.
> >
> > Best,
> > Giannis
> >
> > On Thu, 5 Mar 2026 at 3:38 PM, Giannis Polyzos <[email protected]>
> > wrote:
> >
> > > Hi devs,
> > >
> > > After a long time, i will like to reinitiate the discussions on FIP-17.
> > >
> > > I made quite a few updates on the FIP, which you can find here:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLUSS/FIP-17+Primary+Key+Table+Snapshot+Queries
> > > and updated the title to better reflect the goal. Let me know if it
> makes
> > > sense.
> > >
> > > Moreover in the end of the proposal, you will find a section as *extras
> > *which
> > > has a suggestion for a heartbeat mechanism. However, during my PoC, I
> > found
> > > that this is not really needed, but
> > > I would like your thoughts and feedback first.
> > >
> > > Best,
> > > Giannis
> > >
> > > On Wed, Oct 29, 2025 at 2:45 PM Giannis Polyzos <[email protected]
> >
> > > wrote:
> > >
> > >> Yang, thank you for your thoughtful comments.
> > >>
> > >> Indeed, we are streaming the results to the client; however, it's
> still
> > a
> > >> batch operation. We could use "KV store (or PK table) Snapshot Query"
> > or
> > >> something similar, since we are querying a RocksDB snapshot. WDYT?
> > >> The newly introduced KvBatchScanner should be able to be reused from
> > both
> > >> the client itself - assume a scenario that I want to periodically
> query
> > the
> > >> full RocksDB KV store to power real-time dashboards - as well as Flink
> > >> (with more engines to follow later).
> > >> It issues requests to fetch the results per bucket and transmit them
> > back
> > >> to the client.
> > >>
> > >> > Could you elaborate on why the new KvBatchScanner isn't reusable?
> > >> I think the reasoning here is that reach requests create a new
> > >> KvBatchScanner, which polls the records and then closes automatically.
> > Any
> > >> reason you see this as a limitation, and we should consider making it
> > >> reusable?
> > >>
> > >> The design aims mainly for the Fluss client API.. Should we add an
> > >> integration design with Flink? Wang Cheng, WDYT?
> > >>
> > >> Best,
> > >> Giannis
> > >>
> > >>
> > >>
> > >> On Tue, Oct 28, 2025 at 4:44 AM Yang Wang <[email protected]>
> > >> wrote:
> > >>
> > >>> Hi Cheng,
> > >>>
> > >>> Thank you for driving this excellent work! Your FIP document shows
> > great
> > >>> thought and initiative. I've gone through it and have some questions
> > and
> > >>> suggestions that I hope can further enhance this valuable
> contribution.
> > >>>
> > >>> 1、Regarding the Title, I believe we could consider changing it to
> > >>> "Support
> > >>> full scan in batch mode for PrimaryKey Table". The term "Streaming"
> > might
> > >>> cause confusion with Flink's streaming/batch modes, and this revised
> > >>> title
> > >>> would provide better clarity.
> > >>>
> > >>> 2、In the Motivation section, I think there are two particularly
> > important
> > >>> benefits worth highlighting: (1) OLAP engines will be able to perform
> > >>> full
> > >>> snapshot reads on Fluss primary-key tables. (2) This approach can
> > replace
> > >>> the current KvSnapshotBatchScanner, allowing the Fluss client to
> > >>> eliminate
> > >>> its RocksDB dependency entirely.
> > >>>
> > >>> 3、Concerning the Proposed Changes, could you clarify when exactly the
> > >>> client creates a KV snapshot on the server side, and when we send the
> > >>> bucket_scan_req?
> > >>>
> > >>> Let me share my thinking on this: When Flink attempts to read from a
> > >>> PrimaryKey table, the FlinkSourceEnumerator in the JobMaster
> generates
> > >>> HybridSnapshotLogSplit and dispatches them to SplitReaders running on
> > the
> > >>> TaskManager. The JobMaster doesn't actually read data—it merely
> defines
> > >>> and
> > >>> manages the splits. Therefore, we need to ensure the JM has
> sufficient
> > >>> information to determine the boundary of the KV snapshot and the
> > >>> startOffset of the LogSplit.
> > >>>
> > >>> I suggest we explicitly create a snapshot (or as you've termed it, a
> > >>> new_scan_request) on the server side. This way, the
> > FlinkSourceEnumerator
> > >>> can use it to define a HybridSnapshotLogSplit, and the SplitReaders
> can
> > >>> perform pollBatch operations on this snapshot (which would be bound
> to
> > >>> the
> > >>> specified scanner_id).
> > >>>
> > >>> 4、 Could you elaborate on why the new KvBatchScanner isn't reusable?
> > >>> What's
> > >>> the reasoning behind this limitation? (I believe RocksDB iterators do
> > >>> support the seekToFirst operation.) If a TaskManager fails over
> before
> > a
> > >>> checkpoint, rescanning an existing snapshot seems like a natural
> > >>> requirement.
> > >>>
> > >>> 5、I think it would be beneficial to include some detailed design
> > aspects
> > >>> regarding Flink's integration with the new BatchScanner.
> > >>>
> > >>> Overall, this is a solid foundation for an important enhancement.
> > Looking
> > >>> forward to discussing these points further!
> > >>>
> > >>> Best regards, Yang
> > >>>
> > >>> Wang Cheng <[email protected]> 于2025年10月22日周三 17:09写道:
> > >>>
> > >>> > Hi all,
> > >>> >
> > >>> >
> > >>> > As of v0.8, Fluss only supports KV snapshot batch scan and limit KV
> > >>> batch
> > >>> > scan. The former approach is constrained by snapshot availability
> and
> > >>> > remote storage performance, while the later one is only applicable
> to
> > >>> > queries with LIMIT clause and risks high memory pressure.
> > >>> >
> > >>> >
> > >>> > To address those limitations, Giannis Polyzos and I are writing to
> > >>> propose
> > >>> > FIP-17: a general-purpose streaming KV scan for Fluss [1].
> > >>> >
> > >>> >
> > >>> > Any feedback and suggestions on this proposal are welcome!
> > >>> >
> > >>> >
> > >>> > [1]:
> > >>> >
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLUSS/FIP-17+Streaming+KV+Scan+RPC
> > >>> >
> > >>> > Regards,
> > >>> > Cheng
> > >>> >
> > >>> >
> > >>> >
> > >>> >
> > >>>
> > >>
> >
>
--
Lorenzo Affetti
Senior Software Engineer @ Flink Team
Ververica <http://www.ververica.com>