Hi Giannis,

Thanks for driving this. I only have some questions on the FIP.

(1) Scan API
The FIP says "Remove the LIMIT requirement from
TableScan.createBatchScanner(TableBucket) for PK tables" which reuses
the existing "Table.newScan.createBatchScan(..)" API. But the Public
Interface section introduce a new API "Table.newKvScan.execute()".

I believe extending the existing "Table.newScan().createBatchScan()"
to support non-LIMIT queries would provide a more unified approach.
Since this API already handles KV scan functionality, introducing a
separate KvScan interface could confuse users regarding which method
to choose.

(2) The protobuf definition currently uses `uint32` and `uint64` for
several fields. Since Java lacks native support for unsigned integers,
this forces us to rely on `Integer.toUnsignedLong()` wrappers
throughout the codebase, adding unnecessary complexity. Switching to
`int32` and `int64` would significantly simplify the logic and improve
code readability.

(3) Configuration.
I have a minor suggestion regarding the configuration naming. Since
this scanner is designed exclusively for KV tables, and our existing
server-side KV configurations consistently use the kv.* prefix, I
recommend changing server.scanner.* to kv.scanner.*. This adjustment
will ensure better consistency with our current naming conventions."

Best,
Jark

On Thu, 26 Mar 2026 at 16:44, Lorenzo Affetti via dev
<[email protected]> wrote:
>
> Ehy Giannis! Sorry for the late reply.
>
> *Is the ScannerManager / session state actually required?*
>
> *The key thing with this design is snapshot isolation across the entire
> bucket scan.*
> *A ScannerContext takes a RocksDB snapshot and a ResourceGuard.Lease, and
> all subsequent batches read from that snapshot.*
>
> I understand, however, with the same approach, I see easier ways to
> implement this. For example, make the client reply not only with a
> "last_key" but also with a "snapshot_id".
> The server would need to store the open snapshots (this is the stateful
> part) to re-use them upon request.
> You can drop the iterator, but hold a pointer to the snapshot; this should
> tell RocksDB not to compact it out as the snapshot is not released.
>
> This approach would drastically reduce server-side complexity and hold
> significantly less state.
> I think this should be worth exploring, and if really impossible added as a
> rejected alternative.
>
>
> *Could it be a single streaming RPC?*
>
> *Since the Fluss RPC layer is a custom Netty-based request/response
> protocol, not gRPC, introducing a server-streaming primitive would require
> many changes and effort.*
>
> I understand, however, missing such a primitive still impacts Fluss, and it
> seems to me that we are still paying here the cost of implementing a custom
> streaming RPC for a special case.
> It may be worth investing in the transport primitive once rather than
> duplicating this pattern every time a streaming operation is needed.
> One example is log tailing, from polling to true streaming RPC. Another
> example is changelog streaming.
>
> What if we opt for streaming RPC FIP before this one so that this can
> directly benefit from that?
>
>
> On Fri, Mar 13, 2026 at 7:12 PM Giannis Polyzos <[email protected]>
> wrote:
>
> > Hi Lorenzo,
> >
> > And thank you for your comments. Lots of cool things to digest here, and I
> > would be eager to hear more people's thoughts.
> >
> > Regarding the API
> > table.newScan()
> >      .withSource(ScanSource.LIVE_KV)   // vs SNAPSHOT, LOG
> >      .execute()
> >
> > Currently, the scan method is based on the table creation, so I want to
> > differentiate between a scan (streaming consumption of a log) and KvScan (a
> > bounded rocksdb full table query). Then there is also some functionality
> > that we might not want to expose to the user, for example about the
> > snapshot. This should be internal functionality.
> >
> > *Is the ScannerManager / session state actually required?*
> > Pagination can definitely be a simple approach. However, the key thing
> > with this design is snapshot isolation across the entire bucket scan. A
> > ScannerContext takes a RocksDB snapshot and a ResourceGuard.Lease, and all
> > subsequent batches read from that snapshot. This means:
> > 1. Every key that existed at scan-open time is returned exactly once.
> > 2. No key is duplicated because it was compacted and rewritten between two
> > requests.
> > 3. No key is skipped because it was deleted and reinserted between two
> > requests.
> > With cursor pagination, you lose that guarantee.
> >
> >
> > *Could it be a single streaming RPC?*
> > This would be a great approach, but my understanding is that we can't
> > support this. Since the Fluss RPC layer is a custom Netty-based
> > request/response protocol, not gRPC, introducing a server-streaming
> > primitive would require many changes and effort. Let me know if I'm
> > mistaken.
> >
> > Best,
> > Giannis
> >
> >
> > On Fri, Mar 13, 2026 at 2:34 PM Lorenzo Affetti via dev <
> > [email protected]> wrote:
> >
> >> Hello Giannis!
> >> Very instructive read, I went through that and I was actually astonished
> >> that this was not the "default" mode that Fluss operates on PK tables when
> >> a snapshot is requested.
> >> Very good that we have this initiative.
> >>
> >> *Is a new scanner type explicitly required?*
> >>
> >> I don't think it needs to be addressed now, but a good thing to keep in
> >> mind after this FIP gets implemented.
> >>
> >>
> >> The cleaner design would arguably be to unify these under a single
> >> abstraction where the scan source (live RocksDB vs. remote snapshot) and
> >> chunking behavior are implementation details, not separate scanner
> >> classes.
> >> The KvScan interface being introduced (table.newKvScan()) is a step in
> >> that
> >> direction but it still sits alongside rather than replacing the existing
> >> scan path.
> >>
> >> A truly unified design might look like:
> >>
> >> table.newScan()
> >>      .withSource(ScanSource.LIVE_KV)   // vs SNAPSHOT, LOG
> >>      .execute()
> >>
> >> I hope we keep this in mind for future, relevant situations.
> >>
> >> ---------
> >>
> >> *Is the ScannerManager / session state actually required?*
> >>
> >> The session exists because of the chunked, multi-RPC design -- the server
> >> needs to remember where the iterator is between calls.
> >> But is that statefulness necessary at all?
> >>
> >> The stateless alternative: cursor-based pagination
> >>
> >> Instead of keeping a server-side iterator session, you could do stateless
> >> resumption using a last-seen key as a cursor:
> >>
> >> Request 1: scan from beginning   -> returns rows + last_key="key_0500"
> >> Request 2: scan from "key_0500"  -> returns rows + last_key="key_1000"
> >> Request 3: scan from "key_1000"  -> ...
> >>
> >> Each request is fully independent. The server opens a fresh RocksDB
> >> iterator, seeks to the cursor key, reads a batch, and closes it. No
> >> session, no TTL, no ScannerManager.
> >>
> >> Advantages:
> >>
> >>    - Massively simpler server side -- no session lifecycle, no TTL reaper,
> >>    no leadership fencing complexity
> >>    - Naturally resilient to server restarts and leadership changes --
> >>    client just retries from the last cursor
> >>    - No SCANNER_EXPIRED / UNKNOWN_SCANNER_ID error classes needed
> >>
> >> Tradeoffs:
> >>
> >>    - *Consistency weakens slightly* -- each batch opens a fresh RocksDB
> >>    snapshot, so you might see a key move between batches if it was updated
> >>    between requests. With the session approach, the entire bucket scan is
> >>    under one snapshot.
> >>    - Seek cost -- RocksDB iterator seeks are not free, especially across
> >>    many SST files. For very large tables with many chunks this adds up,
> >> though
> >>    for the small key spaces FIP-17 targets it's likely negligible.
> >>    - Cursor encoding needs care for binary keys.
> >>
> >> Could it be a single streaming RPC?
> >>
> >> Rather than a request/response sequence with session state, you'd have a
> >> single server-streaming RPC:
> >>
> >> client -> ScanKvRequest (open)
> >> server -> stream of ScanKvResponse chunks
> >> server -> closes stream when exhausted
> >>
> >> If this is possible, the entire ScannerManager session complexity
> >> evaporates -- the iterator just lives for the duration of the stream, held
> >> naturally by the connection.
> >>
> >>
> >>
> >> On Tue, Mar 10, 2026 at 9:59 AM Keith Lee <[email protected]>
> >> wrote:
> >>
> >> > Hello Giannis,
> >> >
> >> > Thank you for the update to the proposal! Quickly skimmed through and I
> >> > like the updates that you’ve made! Questions / comments below:
> >> >
> >> > 1. You mentioned an extra section on heartbeat on the FIP, but I do not
> >> see
> >> > heartbeat being mentioned on the latest version of the FIP?  +1 If the
> >> > proposal is updated to rely solely on last scan for TTL and remove
> >> > heartbeat, it’s a great change. If I remember correctly, the previous
> >> > version was to use heartbeat as keepalive, there is a risk of unclosed,
> >> > idle scanner holding resources on server side indefinitely and causing
> >> > leak.
> >> >
> >> > 2. On continuation request, should we check lastAccessTimeMs and reject
> >> if
> >> > elapsed time is larger than TTL? Otherwise sessions can idle between 60
> >> and
> >> > 90 (TTL+ reaper interval). This might be exacerbated if user configure
> >> > particularly high TTL and reaper interval.
> >> >
> >> > 3. On SCANNER_EXPIRED, is it necessary to have a separate error for
> >> expired
> >> > scanner? We can have a single UNKNOWN_OR_EXPIRED_SCANNER (renaming
> >> > UNKNOWN_SCANNER_ID). These are both terminal and non retriable, I
> >> imagine
> >> > that handling it from client side would not differ. It’s also a small
> >> > simplification to the implementation.
> >> >
> >> > 4. On pipelining. If the user queries for top-n every 10 seconds to
> >> update
> >> > leaderboard, would pipelining cause higher unnecessary traffic? E.g.
> >> they
> >> > only care about n records but pipelining automatically fetch up to 8mb.
> >> >
> >> > 5. Also on pipelining, while it seems that we’re keeping Flink connector
> >> > out of scope, IIRC Flink split fetcher also pipelines. If we use this to
> >> > update Flink connector, we’d have higher amount buffered in pipeline.
> >> >
> >> > 6. On expiration interval, should we hide that configuration and choose
> >> to
> >> > expose it if there’s a strong need for it? It’s fewer config for users
> >> to
> >> > reason about and 30s expiration sounds like a good starting point.
> >> >
> >> > Best regards
> >> >
> >> > Keith Lee
> >> >
> >> >
> >> > On Tue, 10 Mar 2026 at 08:49, Giannis Polyzos <[email protected]>
> >> > wrote:
> >> >
> >> > > Hi devs,
> >> > > Let me know if there are any comments here, otherwise I would like to
> >> > start
> >> > > a vote thread.
> >> > >
> >> > > Best,
> >> > > Giannis
> >> > >
> >> > > On Thu, 5 Mar 2026 at 3:38 PM, Giannis Polyzos <[email protected]
> >> >
> >> > > wrote:
> >> > >
> >> > > > Hi devs,
> >> > > >
> >> > > > After a long time, i will like to reinitiate the discussions on
> >> FIP-17.
> >> > > >
> >> > > > I made quite a few updates on the FIP, which you can find here:
> >> > > >
> >> > > >
> >> > >
> >> >
> >> https://cwiki.apache.org/confluence/display/FLUSS/FIP-17+Primary+Key+Table+Snapshot+Queries
> >> > > > and updated the title to better reflect the goal. Let me know if it
> >> > makes
> >> > > > sense.
> >> > > >
> >> > > > Moreover in the end of the proposal, you will find a section as
> >> *extras
> >> > > *which
> >> > > > has a suggestion for a heartbeat mechanism. However, during my PoC,
> >> I
> >> > > found
> >> > > > that this is not really needed, but
> >> > > > I would like your thoughts and feedback first.
> >> > > >
> >> > > > Best,
> >> > > > Giannis
> >> > > >
> >> > > > On Wed, Oct 29, 2025 at 2:45 PM Giannis Polyzos <
> >> [email protected]
> >> > >
> >> > > > wrote:
> >> > > >
> >> > > >> Yang, thank you for your thoughtful comments.
> >> > > >>
> >> > > >> Indeed, we are streaming the results to the client; however, it's
> >> > still
> >> > > a
> >> > > >> batch operation. We could use "KV store (or PK table) Snapshot
> >> Query"
> >> > > or
> >> > > >> something similar, since we are querying a RocksDB snapshot. WDYT?
> >> > > >> The newly introduced KvBatchScanner should be able to be reused
> >> from
> >> > > both
> >> > > >> the client itself - assume a scenario that I want to periodically
> >> > query
> >> > > the
> >> > > >> full RocksDB KV store to power real-time dashboards - as well as
> >> Flink
> >> > > >> (with more engines to follow later).
> >> > > >> It issues requests to fetch the results per bucket and transmit
> >> them
> >> > > back
> >> > > >> to the client.
> >> > > >>
> >> > > >> > Could you elaborate on why the new KvBatchScanner isn't reusable?
> >> > > >> I think the reasoning here is that reach requests create a new
> >> > > >> KvBatchScanner, which polls the records and then closes
> >> automatically.
> >> > > Any
> >> > > >> reason you see this as a limitation, and we should consider making
> >> it
> >> > > >> reusable?
> >> > > >>
> >> > > >> The design aims mainly for the Fluss client API.. Should we add an
> >> > > >> integration design with Flink? Wang Cheng, WDYT?
> >> > > >>
> >> > > >> Best,
> >> > > >> Giannis
> >> > > >>
> >> > > >>
> >> > > >>
> >> > > >> On Tue, Oct 28, 2025 at 4:44 AM Yang Wang <
> >> [email protected]>
> >> > > >> wrote:
> >> > > >>
> >> > > >>> Hi Cheng,
> >> > > >>>
> >> > > >>> Thank you for driving this excellent work! Your FIP document shows
> >> > > great
> >> > > >>> thought and initiative. I've gone through it and have some
> >> questions
> >> > > and
> >> > > >>> suggestions that I hope can further enhance this valuable
> >> > contribution.
> >> > > >>>
> >> > > >>> 1、Regarding the Title, I believe we could consider changing it to
> >> > > >>> "Support
> >> > > >>> full scan in batch mode for PrimaryKey Table". The term
> >> "Streaming"
> >> > > might
> >> > > >>> cause confusion with Flink's streaming/batch modes, and this
> >> revised
> >> > > >>> title
> >> > > >>> would provide better clarity.
> >> > > >>>
> >> > > >>> 2、In the Motivation section, I think there are two particularly
> >> > > important
> >> > > >>> benefits worth highlighting: (1) OLAP engines will be able to
> >> perform
> >> > > >>> full
> >> > > >>> snapshot reads on Fluss primary-key tables. (2) This approach can
> >> > > replace
> >> > > >>> the current KvSnapshotBatchScanner, allowing the Fluss client to
> >> > > >>> eliminate
> >> > > >>> its RocksDB dependency entirely.
> >> > > >>>
> >> > > >>> 3、Concerning the Proposed Changes, could you clarify when exactly
> >> the
> >> > > >>> client creates a KV snapshot on the server side, and when we send
> >> the
> >> > > >>> bucket_scan_req?
> >> > > >>>
> >> > > >>> Let me share my thinking on this: When Flink attempts to read
> >> from a
> >> > > >>> PrimaryKey table, the FlinkSourceEnumerator in the JobMaster
> >> > generates
> >> > > >>> HybridSnapshotLogSplit and dispatches them to SplitReaders
> >> running on
> >> > > the
> >> > > >>> TaskManager. The JobMaster doesn't actually read data—it merely
> >> > defines
> >> > > >>> and
> >> > > >>> manages the splits. Therefore, we need to ensure the JM has
> >> > sufficient
> >> > > >>> information to determine the boundary of the KV snapshot and the
> >> > > >>> startOffset of the LogSplit.
> >> > > >>>
> >> > > >>> I suggest we explicitly create a snapshot (or as you've termed
> >> it, a
> >> > > >>> new_scan_request) on the server side. This way, the
> >> > > FlinkSourceEnumerator
> >> > > >>> can use it to define a HybridSnapshotLogSplit, and the
> >> SplitReaders
> >> > can
> >> > > >>> perform pollBatch operations on this snapshot (which would be
> >> bound
> >> > to
> >> > > >>> the
> >> > > >>> specified scanner_id).
> >> > > >>>
> >> > > >>> 4、 Could you elaborate on why the new KvBatchScanner isn't
> >> reusable?
> >> > > >>> What's
> >> > > >>> the reasoning behind this limitation? (I believe RocksDB
> >> iterators do
> >> > > >>> support the seekToFirst operation.) If a TaskManager fails over
> >> > before
> >> > > a
> >> > > >>> checkpoint, rescanning an existing snapshot seems like a natural
> >> > > >>> requirement.
> >> > > >>>
> >> > > >>> 5、I think it would be beneficial to include some detailed design
> >> > > aspects
> >> > > >>> regarding Flink's integration with the new BatchScanner.
> >> > > >>>
> >> > > >>> Overall, this is a solid foundation for an important enhancement.
> >> > > Looking
> >> > > >>> forward to discussing these points further!
> >> > > >>>
> >> > > >>> Best regards, Yang
> >> > > >>>
> >> > > >>> Wang Cheng <[email protected]> 于2025年10月22日周三 17:09写道:
> >> > > >>>
> >> > > >>> > Hi all,
> >> > > >>> >
> >> > > >>> >
> >> > > >>> > As of v0.8, Fluss only supports KV snapshot batch scan and
> >> limit KV
> >> > > >>> batch
> >> > > >>> > scan. The former approach is constrained by snapshot
> >> availability
> >> > and
> >> > > >>> > remote storage performance, while the later one is only
> >> applicable
> >> > to
> >> > > >>> > queries with LIMIT clause and risks high memory pressure.
> >> > > >>> >
> >> > > >>> >
> >> > > >>> > To address those limitations, Giannis Polyzos and I are writing
> >> to
> >> > > >>> propose
> >> > > >>> > FIP-17: a general-purpose streaming KV scan for Fluss [1].
> >> > > >>> >
> >> > > >>> >
> >> > > >>> > Any feedback and suggestions on this proposal are welcome!
> >> > > >>> >
> >> > > >>> >
> >> > > >>> > [1]:
> >> > > >>> >
> >> > > >>>
> >> > >
> >> >
> >> https://cwiki.apache.org/confluence/display/FLUSS/FIP-17+Streaming+KV+Scan+RPC
> >> > > >>> >
> >> > > >>> > Regards,
> >> > > >>> > Cheng
> >> > > >>> >
> >> > > >>> >
> >> > > >>> >
> >> > > >>> > &nbsp;
> >> > > >>>
> >> > > >>
> >> > >
> >> >
> >>
> >>
> >> --
> >> Lorenzo Affetti
> >> Senior Software Engineer @ Flink Team
> >> Ververica <http://www.ververica.com>
> >>
> >
>
> --
> Lorenzo Affetti
> Senior Software Engineer @ Flink Team
> Ververica <http://www.ververica.com>

Reply via email to