Hey Jun, Hmm... thinking about this more, I am not sure that the proposed API is sufficient. For users that store offset externally, we probably need extra API to return the leader_epoch and partition_epoch for all partitions that consumers are consuming. I suppose these users currently use position() to get the offset. Thus we probably need a new method positionWithEpoch(..) to return <offset, partition_epoch, leader_epoch>. Does this sound reasonable?
Thanks, Dong On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao <j...@confluent.io> wrote: > Hi, Dong, > > Yes, that's what I am thinking. OffsetEpoch will be composed of > (partition_epoch, > leader_epoch). > > Thanks, > > Jun > > > On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin <lindon...@gmail.com> wrote: > > > Hey Jun, > > > > Thanks much. I like the the new API that you proposed. I am not sure what > > you exactly mean by offset_epoch. I suppose that we can use the pair of > > (partition_epoch, leader_epoch) as the offset_epoch, right? > > > > Thanks, > > Dong > > > > On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao <j...@confluent.io> wrote: > > > > > Hi, Dong, > > > > > > Got it. The api that you proposed works. The question is whether that's > > the > > > api that we want to have in the long term. My concern is that while the > > api > > > change is simple, the new api seems harder to explain and use. For > > example, > > > a consumer storing offsets externally now needs to call > > > waitForMetadataUpdate() after calling seek(). > > > > > > An alternative approach is to make the following compatible api changes > > in > > > Consumer. > > > * Add an additional OffsetEpoch field in OffsetAndMetadata. (no need to > > > change the CommitSync() api) > > > * Add a new api seek(TopicPartition partition, long offset, OffsetEpoch > > > offsetEpoch). We can potentially deprecate the old api > > seek(TopicPartition > > > partition, long offset) in the future. > > > > > > The alternative approach has similar amount of api changes as yours but > > has > > > the following benefits. > > > 1. The api works in a similar way as how offset management works now > and > > is > > > probably what we want in the long term. > > > 2. It can reset offsets better when there is data loss due to unclean > > > leader election or correlated replica failure. > > > 3. It can reset offsets better when topic is recreated. > > > > > > Thanks, > > > > > > Jun > > > > > > > > > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin <lindon...@gmail.com> wrote: > > > > > > > Hey Jun, > > > > > > > > Yeah I agree that ideally we don't want an ever growing global > metadata > > > > version. I just think it may be more desirable to keep the consumer > API > > > > simple. > > > > > > > > In my current proposal, metadata version returned in the fetch > response > > > > will be stored with the offset together. More specifically, the > > > > metadata_epoch in the new offset topic schema will be the largest > > > > metadata_epoch from all the MetadataResponse and FetchResponse ever > > > > received by this consumer. > > > > > > > > We probably don't have to change the consumer API for > > > > commitSync(Map<TopicPartition, OffsetAndMetadata>). If user calls > > > > commitSync(...) to commit offset 10 for a given partition, for most > > > > use-cases, this consumer instance should have consumed message with > > > offset > > > > 9 from this partition, in which case the consumer can remember and > use > > > the > > > > metadata_epoch from the corresponding FetchResponse when committing > > > offset. > > > > If user calls commitSync(..) to commit offset 10 for a given > partition > > > > without having consumed the message with offset 9 using this consumer > > > > instance, this is probably an advanced use-case. In this case the > > > advanced > > > > user can retrieve the metadata_epoch using the newly added > > > metadataEpoch() > > > > API after it fetches the message with offset 9 (probably from another > > > > consumer instance) and encode this metadata_epoch in the > > > > string OffsetAndMetadata.metadata. Do you think this solution would > > work? > > > > > > > > By "not sure that I fully understand your latest suggestion", are you > > > > referring to solution related to unclean leader election using > > > leader_epoch > > > > in my previous email? > > > > > > > > Thanks, > > > > Dong > > > > > > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao <j...@confluent.io> wrote: > > > > > > > > > Hi, Dong, > > > > > > > > > > Not sure that I fully understand your latest suggestion. Returning > an > > > > ever > > > > > growing global metadata version itself is no ideal, but is fine. My > > > > > question is whether the metadata version returned in the fetch > > response > > > > > needs to be stored with the offset together if offsets are stored > > > > > externally. If so, we also have to change the consumer API for > > > > commitSync() > > > > > and need to worry about compatibility. If we don't store the > metadata > > > > > version together with the offset, on a consumer restart, it's not > > clear > > > > how > > > > > we can ensure the metadata in the consumer is high enough since > there > > > is > > > > no > > > > > metadata version to compare with. > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > > > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong Lin <lindon...@gmail.com> > > wrote: > > > > > > > > > > > Hey Jun, > > > > > > > > > > > > Thanks much for the explanation. > > > > > > > > > > > > I understand the advantage of partition_epoch over > metadata_epoch. > > My > > > > > > current concern is that the use of leader_epoch and the > > > partition_epoch > > > > > > requires us considerable change to consumer's public API to take > > care > > > > of > > > > > > the case where user stores offset externally. For example, > > > *consumer*. > > > > > > *commitSync*(..) would have to take a map whose value is <offset, > > > > > metadata, > > > > > > leader epoch, partition epoch>. *consumer*.*seek*(...) would also > > > need > > > > > > leader_epoch and partition_epoch as parameter. Technically we can > > > > > probably > > > > > > still make it work in a backward compatible manner after careful > > > design > > > > > and > > > > > > discussion. But these changes can make the consumer's interface > > > > > > unnecessarily complex for more users who do not store offset > > > > externally. > > > > > > > > > > > > After thinking more about it, we can address all problems > discussed > > > by > > > > > only > > > > > > using the metadata_epoch without introducing leader_epoch or the > > > > > > partition_epoch. The current KIP describes the changes to the > > > consumer > > > > > API > > > > > > and how the new API can be used if user stores offset externally. > > In > > > > > order > > > > > > to address the scenario you described earlier, we can include > > > > > > metadata_epoch in the FetchResponse and the LeaderAndIsrRequest. > > > > Consumer > > > > > > remembers the largest metadata_epoch from all the FetchResponse > it > > > has > > > > > > received. The metadata_epoch committed with the offset, either > > within > > > > or > > > > > > outside Kafka, should be the largest metadata_epoch across all > > > > > > FetchResponse and MetadataResponse ever received by this > consumer. > > > > > > > > > > > > The drawback of using only the metadata_epoch is that we can not > > > always > > > > > do > > > > > > the smart offset reset in case of unclean leader election which > you > > > > > > mentioned earlier. But in most case, unclean leader election > > probably > > > > > > happens when consumer is not rebalancing/restarting. In these > > cases, > > > > > either > > > > > > consumer is not directly affected by unclean leader election > since > > it > > > > is > > > > > > not consuming from the end of the log, or consumer can derive the > > > > > > leader_epoch from the most recent message received before it sees > > > > > > OffsetOutOfRangeException. So I am not sure it is worth adding > the > > > > > > leader_epoch to consumer API to address the remaining corner > case. > > > What > > > > > do > > > > > > you think? > > > > > > > > > > > > Thanks, > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun Rao <j...@confluent.io> > wrote: > > > > > > > > > > > > > Hi, Dong, > > > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > > > To solve the topic recreation issue, we could use either a > global > > > > > > metadata > > > > > > > version or a partition level epoch. But either one will be a > new > > > > > concept, > > > > > > > right? To me, the latter seems more natural. It also makes it > > > easier > > > > to > > > > > > > detect if a consumer's offset is still valid after a topic is > > > > > recreated. > > > > > > As > > > > > > > you pointed out, we don't need to store the partition epoch in > > the > > > > > > message. > > > > > > > The following is what I am thinking. When a partition is > created, > > > we > > > > > can > > > > > > > assign a partition epoch from an ever-increasing global counter > > and > > > > > store > > > > > > > it in /brokers/topics/[topic]/partitions/[partitionId] in ZK. > > The > > > > > > > partition > > > > > > > epoch is propagated to every broker. The consumer will be > > tracking > > > a > > > > > > tuple > > > > > > > of <offset, leader epoch, partition epoch> for offsets. If a > > topic > > > is > > > > > > > recreated, it's possible that a consumer's offset and leader > > epoch > > > > > still > > > > > > > match that in the broker, but partition epoch won't be. In this > > > case, > > > > > we > > > > > > > can potentially still treat the consumer's offset as out of > range > > > and > > > > > > reset > > > > > > > the offset based on the offset reset policy in the consumer. > This > > > > seems > > > > > > > harder to do with a global metadata version. > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Dec 25, 2017 at 6:56 AM, Dong Lin <lindon...@gmail.com > > > > > > wrote: > > > > > > > > > > > > > > > Hey Jun, > > > > > > > > > > > > > > > > This is a very good example. After thinking through this in > > > > detail, I > > > > > > > agree > > > > > > > > that we need to commit offset with leader epoch in order to > > > address > > > > > > this > > > > > > > > example. > > > > > > > > > > > > > > > > I think the remaining question is how to address the scenario > > > that > > > > > the > > > > > > > > topic is deleted and re-created. One possible solution is to > > > commit > > > > > > > offset > > > > > > > > with both the leader epoch and the metadata version. The > logic > > > and > > > > > the > > > > > > > > implementation of this solution does not require a new > concept > > > > (e.g. > > > > > > > > partition epoch) and it does not require any change to the > > > message > > > > > > format > > > > > > > > or leader epoch. It also allows us to order the metadata in a > > > > > > > > straightforward manner which may be useful in the future. So > it > > > may > > > > > be > > > > > > a > > > > > > > > better solution than generating a random partition epoch > every > > > time > > > > > we > > > > > > > > create a partition. Does this sound reasonable? > > > > > > > > > > > > > > > > Previously one concern with using the metadata version is > that > > > > > consumer > > > > > > > > will be forced to refresh metadata even if metadata version > is > > > > > > increased > > > > > > > > due to topics that the consumer is not interested in. Now I > > > > realized > > > > > > that > > > > > > > > this is probably not a problem. Currently client will refresh > > > > > metadata > > > > > > > > either due to InvalidMetadataException in the response from > > > broker > > > > or > > > > > > due > > > > > > > > to metadata expiry. The addition of the metadata version > should > > > > > > increase > > > > > > > > the overhead of metadata refresh caused by > > > > InvalidMetadataException. > > > > > If > > > > > > > > client refresh metadata due to expiry and it receives a > > metadata > > > > > whose > > > > > > > > version is lower than the current metadata version, we can > > reject > > > > the > > > > > > > > metadata but still reset the metadata age, which essentially > > keep > > > > the > > > > > > > > existing behavior in the client. > > > > > > > > > > > > > > > > Thanks much, > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >