Hi Liebing,

Thanks for the thoughtful feedback. We indeed need to consider the problem of 
partition expiration. In the short term, I personally think we could choose to 
ignore partition expiration—preventing expired partitions from triggering 
failover in read jobs or tiering jobs. There's no need to introduce partition 
consumer logic, which would increase implementation complexity. What do you 
think?

Best regards,
Yunhong

On 2025/12/18 07:01:10 Liebing Yu wrote:
> Hi Yunhong!
> 
> Thanks for your great work. I have a few questions:
> 
> 1. Regarding Partition Expiration and Data Consistency:
> For KV tables, snapshot expiration significantly impacts consumers.
> However, for both KV and Log tables, partition expiration is another
> critical factor. Imagine a Flink job reading a Fluss partition that expires
> due to TTL; in this case, data already written to that partition will not
> be fully consumed.
> 
> This creates a more pronounced data consistency issue in "union read"
> scenarios (integrated lake-stream tables). Consider this scenario:
> A Flink job starts consuming a partition where some data has already been
> synced to the lake by the tiering service. If the partition expires in
> Fluss during the consumption process (even if its data has been fully
> synced to the lake), the Flink job will end up with incomplete streaming
> data. However, a subsequent batch read of the same partition would retrieve
> the full data from the lake. This leads to a stream-batch inconsistency—the
> same partition yields different results depending on whether it's read via
> the stream or the batch path.
> 
> 2. Regarding the Consumer Framework:
> Beyond the KV consumer, the issues mentioned above suggest that we may need
> a Partition Consumer in the future. In fact, it might be beneficial to
> introduce a generic Consumer concept for any ordinary job, allowing Fluss
> to monitor the consumption progress of various clients.
> 
> My question is: Should we take these factors into account now and establish
> a more comprehensive consumer framework? This would make it much easier to
> integrate new consumer types in the future.
> 
> 
> Best regards,
> Liebing Yu
> 
> 
> On Tue, 16 Dec 2025 at 19:33, Yunhong Zheng <[email protected]> wrote:
> 
> > Hi Yang,
> >
> > Thanks for the thoughtful feedback. Your analysis has significantly
> > improved the stability of introducing the this function. I will address
> > each problem individually:
> >
> > Re for question 1:
> >
> > Good catch!  I encountered this issue during testing as well. The key
> > problem is the time gap between fetching the latest kv snapshot and
> > registering the kv snapshot consumer, which are not atomic operations. This
> > was considered during the design phase, but to avoid altering the semantics
> > of Admin#getLatestKvSnapshots, we chose to keep the register kv snapshot
> > consumer as a separate API.
> >
> > The solution is to add a retry mechanism: the failedTableBucketSet in the
> > RegisterKvSnapshotResult returned by registerKvSnapshotConsumer must be
> > retried repeatedly (Get latest snapshot -> register kv snapshot consumer)
> > until all buckets are successfully registered.
> >
> > ------
> > Re for question 2:
> >
> > It's indeed worth considering continuing to retain multiple snapshots for
> > one table bucket to reduce the impact on stability. The configuration
> > 'kv.snapshot.num-retained' will also be kept as is for now. However, there
> > might still be a potential issue: excessive ZooKeeper metadata, which can
> > be addressed as a separate FIP or issue.
> >
> > ------
> > Re for question 3:
> >
> > For the following scenarios, we can consider them as rare cases or misuse:
> >
> > 1. A job continue failover and is restored after a day or longer.
> > 2. Consuming a snapshot takes an exceptionally long time.
> > 3. The same consumerId is used by multiple jobs to consume multiple tables.
> >
> > These scenarios may lead to the snapshot being cleared before the user
> > consumes it. For such cases, we can document them in the user guide, and
> > users can also configure a larger expiration time to mitigate this problem.
> >
> >
> > -----
> > Re for question 4:
> >
> > This question involves a trade-off between two strategies:
> >
> > 1. Expiration time based on the first registration: This accelerates the
> > consumer's TTL, preventing snapshots from being retained for extended
> > periods and avoiding metadata bloat.
> > 2. Expiration time based on the latest registration: This minimizes the
> > risk of snapshots being cleaned up during consumption.
> >
> > I personally lean toward the first strategy, but the second is also valid.
> >
> > Best regards,
> > Yunhong
> >
> > On 2025/12/16 08:19:54 Yang Wang wrote:
> > > Hi Yunhong,
> > >
> > > Thank you for driving this very useful initiative—it will significantly
> > > improve the stability of data consumption and the overall user
> > experience.
> > > I generally agree with most aspects of the design, but I have a few minor
> > > questions:
> > >
> > >    1.
> > >
> > >    When Flink enumerates the snapshots it needs to subscribe to and
> > >    attempts to subscribe, what is the expected behavior if a snapshot
> > gets
> > >    deleted just before the subscription request is sent? (This scenario
> > might
> > >    occur in certain edge cases.)
> > >    2.
> > >
> > >    Retaining only one snapshot by default could be risky. Currently,
> > >    neither the CoordinatorServer nor the TabletServer can fully
> > guarantee that
> > >    snapshots registered in ZooKeeper are always reliable and valid
> > (although
> > >    this is the design intention, there might still be undiscovered bugs).
> > >    3.
> > >
> > >    I didn’t see any API for updating subscriptions. If a Flink job
> > >    experiences a prolonged failover for some reason, would the unread
> > >    snapshots expire as a result?
> > >    4.
> > >
> > >    Related to the previous question: Is the TTL of a snapshot calculated
> > >    from the time it was registered, or from the last time a consumer
> > updated
> > >    its subscription?
> > >
> > > Best regards,
> > > Yang
> > >
> > > yunhong Zheng <[email protected]> 于2025年12月11日周四 22:40写道:
> > >
> > > > Hi all,
> > > >
> > > > Currently, for Fluss PrimaryKey Table, the number of kv snapshots
> > retained
> > > > per bucket is controlled by the server
> > > > option `kv.snapshot.num-retained`  (default value: 1). If this value
> > is set
> > > > too small, Kv snapshots that are being actively consumed may be deleted
> > > > while a consumer is still consuming them.
> > > >
> > > > This case will cause a Flink job which read PrimaryKey table fail and
> > > > cannot be restarted from its previous state.
> > > >
> > > > To avoid this case, the fluss server needs to be aware of which
> > consumers
> > > > are actively consuming the corresponding kv snapshots, and can not
> > delete
> > > > these kv snapshots that are currently being consumed.
> > > >
> > > > So, I'd like to propose FIP-22: Support Kv Snapshot Consumer[1].
> > > >
> > > > Any feedback are suggestions on this proposal are welcome!
> > > >
> > > > [1]:
> > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLUSS/FIP-22+Support+Kv+Snapshot+Consumer
> > > >
> > > > Regards,
> > > > Yunhong
> > > >
> > >
> >
> 

Reply via email to