Hi Ryan, Thanks for sharing your thoughts! We initially tried to leave the RPCs as they were, but you're absolutely right that the current KIP feels a bit like a kludge.
If we are open to touching the protocol (and can accept the potential time skew between nodes), I have another idea: what if we add a creation timestamp to both the partition and the group? This information could be returned to the consumer via the new Heartbeat RPC. The async consumer could then seamlessly leverage these timestamps to make a deterministic decision between using "latest" or "earliest." This approach would only work for the AsyncConsumer using subscribe() (since it relies on the group state), which actually serves as another great incentive for users to migrate to the new consumer! Thoughts on this direction? Best, Chia-Ping Ryan Leslie (BLOOMBERG/ NEW YORK) <[email protected]> 於 2026年3月10日週二 上午6:38寫道: > Hi Chia and Jiunn, > > Thanks for the response. I agree that the explicit timestamp gives enough > flexibility for the user to avoid the issue I mentioned with the implicit > timestamp at startup not matching the time the group instance started. > > One potential downside is that the user may have to store this timestamp > somewhere in between restarts. For the group instance id, that is not > always the case since sometimes it can be derived from the environment such > as the hostname, or hardcoded in an environment variable where it typically > doesn't need to be updated. > > Also, since static instances may be long-lived, preserving just the > initial timestamp of the first instance might feel a bit awkward, since you > may end up with static instances restarting and passing timestamps that > could be old like two months ago. The user could instead store something > like the last time of restart (and subtract metadata max age from it to be > safe), but it can be considered a burden and may fail if shutdown was not > graceful, i.e. a crash. > > I agree that this KIP provides a workable solution to avoid data loss > without protocol or broker changes, so I'm +1. But it does still feel a > little like a kludge since what the user really needs is an easy, almost > implicit, way to not lose data when a recently added partition is > discovered, and currently there is no metadata for the creation time of a > partition. The user may not want to even have the same policy applied to > older partitions for which their offset was deleted. > > Even for a consumer group not using static membership, suppose partitions > are added by a producer and new messages are published. If at the same time > there is consumer group, e.g. with 1 consumer only, and it has crashed, > when it comes back up it may lose messages unless it knows what timestamp > to pass. > > Thanks, > > Ryan > > From: [email protected] At: 03/07/26 02:46:28 UTC-5:00To: > [email protected] > Subject: Re: [DISCUSS] KIP-1282: Prevent data loss during partition > expansion for dynamically added partitions > > Hello Sikka, > > > If consumer restarts (app crash, bounce etc.) after dynamically adding > partitions > > it would consume unread messages from last committed offset for existing > > partitions but would still miss the messages from new partition. > > For dynamic consumers, a restart inherently means leaving and rejoining > the > group > as a new member, so recalculating startupTimestamp = now() is semantically > correct — > the consumer is genuinely starting fresh. > > The gap you described only applies to static membership, where the > consumer can > restart > without triggering a rebalance, yet the local timestamp still gets reset. > For > this scenario, as > Chia suggested, we could extend the configuration to accept an explicit > timestamp > This would allow users to pin a fixed reference point across restarts, > effectively closing the gap for > static membership. For dynamic consumers, the default by_start_time > without an > explicit timestamp > already provides the correct behavior and a significant improvement over > latest, which would miss > data even without a restart. > > > If the offset are deleted mention in mentioned in Scenario 2 (Log > truncation) > > how this solution would address that scenario ? > > For the log truncation scenario, when segments are deleted and the > consumer's > committed > offset becomes out of range, auto.offset.reset is triggered. With latest, > the > consumer simply jumps > to the end of the partition, skipping all remaining available data. With > by_start_time, the consumer looks up > the position based on the startup timestamp rather than relying on > offsets. > Since the lookup is timestamp-based, > it is not affected by offset invalidation due to truncation. Any data with > timestamps at or after the startup time > will still be found and consumed. > > > Do we need to care about Clock Skew or SystemTime Issues on consumer > client > side. > > Should we use timestamp on the server/broker side ? > > Clock skew is a fair concern, but using a server-side timestamp does not > necessarily make things safer. > It would mean comparing the Group Coordinator's time against the Partition > Leader's time, which are often > different nodes. Without strict clock synchronization across the Kafka > cluster, > this "happens-before" relationship > remains fundamentally unpredictable. On the other hand, auto.offset.reset > is > strictly a client-level configuration — > consumers within the same group can intentionally use different policies. > Tying > the timestamp to a global server-side > state would be a semantic mismatch. A local timestamp aligns much better > with > the client-level nature of this config. > > > Do you plan to have any metrics or observability when consumer resets > offset > by_start_time > > That's a great suggestion. I plan to expose the startup timestamp used by > by_start_time as a client-level metric, > so users can easily verify which reference point the consumer is using > during > debugging. > > Best Regards, > Jiunn-Yang > > > Chia-Ping Tsai <[email protected]> 於 2026年3月6日 晚上10:44 寫道: > > > > Hi Ryan, > > > > That is a fantastic point. A static member restarting and capturing a > newer > local timestamp is definitely a critical edge case. > > > > Since users already need to inject a unique group.instance.id into the > configuration for static members, my idea is to allow the > auto.offset.reset > policy to carry a dedicated timestamp to explicitly "lock" the startup > time > (for example, using a format like auto.offset.reset=startup:<timestamp>). > > > > This means that if users want to leverage this new policy with static > membership, their deployment configuration would simply include these two > specific injected values (the static ID and the locked timestamp). > > > > This approach elegantly maintains the configuration semantics at the > member-level, and most importantly, it avoids any need to update the RPC > protocol. > > > > What do you think of this approach for the static membership scenario? > > > > On 2026/03/05 19:45:13 "Ryan Leslie (BLOOMBERG/ NEW YORK)" wrote: > >> Hey Jiunn, > >> > >> Glad to see some progress around this issue. > >> > >> I had a similar thought to David, that if the time is only known client > side > there are still edge cases for data loss. One case is static membership > where > from the perspective of a client they are free to restart their consumer > task > without actually having left or meaningfully affected the group. However, > I > think with the proposed implementation the timestamp is still reset here. > So if > the restart happens just after a partition is added and published to, but > before the consumer metadata refreshed, the group still runs the risk of > data > loss. > >> > >> It could be argued that keeping the group 'stable' is a requirement for > this > feature to work, but sometimes it's not possible to accomplish. > >> > >> From: [email protected] At: 03/05/26 14:32:20 UTC-5:00To: > [email protected] > >> Subject: Re: [DISCUSS] KIP-1282: Prevent data loss during partition > expansion for dynamically added partitions > >> > >> Hi Jiunn, > >> > >> Thanks for the KIP! > >> > >> I was also considering this solution while we discussed in the jira. It > >> seems to work in most of the cases but not in all. For instance, let’s > >> imagine a partition created just before a new consumer joins or rejoins > the > >> group and this consumer gets the new partition. In this case, the > consumer > >> will have a start time which is older than the partition creation time. > >> This could also happen with the truncation case. It makes the behavior > kind > >> of unpredictable again. > >> > >> Instead of relying on a local timestamp, one idea would to rely on a > >> timestamp provided by the server. For instance, we could define the time > >> since the group became non-empty. This would define the subscription > time > >> for the consumer group. The downside is that it only works if the > consumer > >> is part of a group. > >> > >> In your missing semantic section, I don’t fully understand how the 4th > >> point is improved by the KIP. It says start from earliest but with the > >> change it would start from application start time. Could you elaborate? > >> > >> Best, > >> David > >> > >> Le jeu. 5 mars 2026 à 12:47, 黃竣陽 <[email protected]> a écrit : > >> > >>> Hello chia, > >>> > >>> Thanks for your feedback, I have updated the KIP. > >>> > >>> Best Regards, > >>> Jiunn-Yang > >>> > >>>> Chia-Ping Tsai <[email protected]> 於 2026年3月5日 晚上7:24 寫道: > >>>> > >>>> hi Jiunn > >>>> > >>>> chia_00: Would you mind mentioning KAFKA-19236 in the KIP? It would be > >>> helpful to let readers know that "Dynamically at partition discovery" > is > >>> being tracked as a separate issue > >>>> > >>>> Best, > >>>> Chia-Ping > >>>> > >>>> On 2026/03/05 11:14:31 黃竣陽 wrote: > >>>>> Hello everyone, > >>>>> > >>>>> I would like to start a discussion on KIP-1282: Prevent data loss > >>> during partition expansion for dynamically added partitions > >>>>> <https://cwiki.apache.org/confluence/x/mIY8G> > >>>>> > >>>>> This proposal aims to introduce a new auto.offset.reset policy > >>> by_start_time, anchoring the > >>>>> offset reset to the consumer's startup timestamp rather than > partition > >>> discovery time, to prevent > >>>>> silent data loss during partition expansion. > >>>>> > >>>>> Best regards, > >>>>> Jiunn-Yang > >>> > >>> > >> > >> > >> > > >
