Hey Jun, Jason, Thanks much for all the feedback. I have updated the KIP based on the latest discussion. Can you help check whether it looks good?
Thanks, Dong On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin <lindon...@gmail.com> wrote: > Hey Jun, > > Hmm... thinking about this more, I am not sure that the proposed API is > sufficient. For users that store offset externally, we probably need extra > API to return the leader_epoch and partition_epoch for all partitions that > consumers are consuming. I suppose these users currently use position() to > get the offset. Thus we probably need a new method positionWithEpoch(..) to > return <offset, partition_epoch, leader_epoch>. Does this sound reasonable? > > Thanks, > Dong > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao <j...@confluent.io> wrote: > >> Hi, Dong, >> >> Yes, that's what I am thinking. OffsetEpoch will be composed of >> (partition_epoch, >> leader_epoch). >> >> Thanks, >> >> Jun >> >> >> On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin <lindon...@gmail.com> wrote: >> >> > Hey Jun, >> > >> > Thanks much. I like the the new API that you proposed. I am not sure >> what >> > you exactly mean by offset_epoch. I suppose that we can use the pair of >> > (partition_epoch, leader_epoch) as the offset_epoch, right? >> > >> > Thanks, >> > Dong >> > >> > On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao <j...@confluent.io> wrote: >> > >> > > Hi, Dong, >> > > >> > > Got it. The api that you proposed works. The question is whether >> that's >> > the >> > > api that we want to have in the long term. My concern is that while >> the >> > api >> > > change is simple, the new api seems harder to explain and use. For >> > example, >> > > a consumer storing offsets externally now needs to call >> > > waitForMetadataUpdate() after calling seek(). >> > > >> > > An alternative approach is to make the following compatible api >> changes >> > in >> > > Consumer. >> > > * Add an additional OffsetEpoch field in OffsetAndMetadata. (no need >> to >> > > change the CommitSync() api) >> > > * Add a new api seek(TopicPartition partition, long offset, >> OffsetEpoch >> > > offsetEpoch). We can potentially deprecate the old api >> > seek(TopicPartition >> > > partition, long offset) in the future. >> > > >> > > The alternative approach has similar amount of api changes as yours >> but >> > has >> > > the following benefits. >> > > 1. The api works in a similar way as how offset management works now >> and >> > is >> > > probably what we want in the long term. >> > > 2. It can reset offsets better when there is data loss due to unclean >> > > leader election or correlated replica failure. >> > > 3. It can reset offsets better when topic is recreated. >> > > >> > > Thanks, >> > > >> > > Jun >> > > >> > > >> > > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin <lindon...@gmail.com> wrote: >> > > >> > > > Hey Jun, >> > > > >> > > > Yeah I agree that ideally we don't want an ever growing global >> metadata >> > > > version. I just think it may be more desirable to keep the consumer >> API >> > > > simple. >> > > > >> > > > In my current proposal, metadata version returned in the fetch >> response >> > > > will be stored with the offset together. More specifically, the >> > > > metadata_epoch in the new offset topic schema will be the largest >> > > > metadata_epoch from all the MetadataResponse and FetchResponse ever >> > > > received by this consumer. >> > > > >> > > > We probably don't have to change the consumer API for >> > > > commitSync(Map<TopicPartition, OffsetAndMetadata>). If user calls >> > > > commitSync(...) to commit offset 10 for a given partition, for most >> > > > use-cases, this consumer instance should have consumed message with >> > > offset >> > > > 9 from this partition, in which case the consumer can remember and >> use >> > > the >> > > > metadata_epoch from the corresponding FetchResponse when committing >> > > offset. >> > > > If user calls commitSync(..) to commit offset 10 for a given >> partition >> > > > without having consumed the message with offset 9 using this >> consumer >> > > > instance, this is probably an advanced use-case. In this case the >> > > advanced >> > > > user can retrieve the metadata_epoch using the newly added >> > > metadataEpoch() >> > > > API after it fetches the message with offset 9 (probably from >> another >> > > > consumer instance) and encode this metadata_epoch in the >> > > > string OffsetAndMetadata.metadata. Do you think this solution would >> > work? >> > > > >> > > > By "not sure that I fully understand your latest suggestion", are >> you >> > > > referring to solution related to unclean leader election using >> > > leader_epoch >> > > > in my previous email? >> > > > >> > > > Thanks, >> > > > Dong >> > > > >> > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao <j...@confluent.io> wrote: >> > > > >> > > > > Hi, Dong, >> > > > > >> > > > > Not sure that I fully understand your latest suggestion. >> Returning an >> > > > ever >> > > > > growing global metadata version itself is no ideal, but is fine. >> My >> > > > > question is whether the metadata version returned in the fetch >> > response >> > > > > needs to be stored with the offset together if offsets are stored >> > > > > externally. If so, we also have to change the consumer API for >> > > > commitSync() >> > > > > and need to worry about compatibility. If we don't store the >> metadata >> > > > > version together with the offset, on a consumer restart, it's not >> > clear >> > > > how >> > > > > we can ensure the metadata in the consumer is high enough since >> there >> > > is >> > > > no >> > > > > metadata version to compare with. >> > > > > >> > > > > Thanks, >> > > > > >> > > > > Jun >> > > > > >> > > > > >> > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong Lin <lindon...@gmail.com> >> > wrote: >> > > > > >> > > > > > Hey Jun, >> > > > > > >> > > > > > Thanks much for the explanation. >> > > > > > >> > > > > > I understand the advantage of partition_epoch over >> metadata_epoch. >> > My >> > > > > > current concern is that the use of leader_epoch and the >> > > partition_epoch >> > > > > > requires us considerable change to consumer's public API to take >> > care >> > > > of >> > > > > > the case where user stores offset externally. For example, >> > > *consumer*. >> > > > > > *commitSync*(..) would have to take a map whose value is >> <offset, >> > > > > metadata, >> > > > > > leader epoch, partition epoch>. *consumer*.*seek*(...) would >> also >> > > need >> > > > > > leader_epoch and partition_epoch as parameter. Technically we >> can >> > > > > probably >> > > > > > still make it work in a backward compatible manner after careful >> > > design >> > > > > and >> > > > > > discussion. But these changes can make the consumer's interface >> > > > > > unnecessarily complex for more users who do not store offset >> > > > externally. >> > > > > > >> > > > > > After thinking more about it, we can address all problems >> discussed >> > > by >> > > > > only >> > > > > > using the metadata_epoch without introducing leader_epoch or the >> > > > > > partition_epoch. The current KIP describes the changes to the >> > > consumer >> > > > > API >> > > > > > and how the new API can be used if user stores offset >> externally. >> > In >> > > > > order >> > > > > > to address the scenario you described earlier, we can include >> > > > > > metadata_epoch in the FetchResponse and the LeaderAndIsrRequest. >> > > > Consumer >> > > > > > remembers the largest metadata_epoch from all the FetchResponse >> it >> > > has >> > > > > > received. The metadata_epoch committed with the offset, either >> > within >> > > > or >> > > > > > outside Kafka, should be the largest metadata_epoch across all >> > > > > > FetchResponse and MetadataResponse ever received by this >> consumer. >> > > > > > >> > > > > > The drawback of using only the metadata_epoch is that we can not >> > > always >> > > > > do >> > > > > > the smart offset reset in case of unclean leader election which >> you >> > > > > > mentioned earlier. But in most case, unclean leader election >> > probably >> > > > > > happens when consumer is not rebalancing/restarting. In these >> > cases, >> > > > > either >> > > > > > consumer is not directly affected by unclean leader election >> since >> > it >> > > > is >> > > > > > not consuming from the end of the log, or consumer can derive >> the >> > > > > > leader_epoch from the most recent message received before it >> sees >> > > > > > OffsetOutOfRangeException. So I am not sure it is worth adding >> the >> > > > > > leader_epoch to consumer API to address the remaining corner >> case. >> > > What >> > > > > do >> > > > > > you think? >> > > > > > >> > > > > > Thanks, >> > > > > > Dong >> > > > > > >> > > > > > >> > > > > > >> > > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun Rao <j...@confluent.io> >> wrote: >> > > > > > >> > > > > > > Hi, Dong, >> > > > > > > >> > > > > > > Thanks for the reply. >> > > > > > > >> > > > > > > To solve the topic recreation issue, we could use either a >> global >> > > > > > metadata >> > > > > > > version or a partition level epoch. But either one will be a >> new >> > > > > concept, >> > > > > > > right? To me, the latter seems more natural. It also makes it >> > > easier >> > > > to >> > > > > > > detect if a consumer's offset is still valid after a topic is >> > > > > recreated. >> > > > > > As >> > > > > > > you pointed out, we don't need to store the partition epoch in >> > the >> > > > > > message. >> > > > > > > The following is what I am thinking. When a partition is >> created, >> > > we >> > > > > can >> > > > > > > assign a partition epoch from an ever-increasing global >> counter >> > and >> > > > > store >> > > > > > > it in /brokers/topics/[topic]/partitions/[partitionId] in ZK. >> > The >> > > > > > > partition >> > > > > > > epoch is propagated to every broker. The consumer will be >> > tracking >> > > a >> > > > > > tuple >> > > > > > > of <offset, leader epoch, partition epoch> for offsets. If a >> > topic >> > > is >> > > > > > > recreated, it's possible that a consumer's offset and leader >> > epoch >> > > > > still >> > > > > > > match that in the broker, but partition epoch won't be. In >> this >> > > case, >> > > > > we >> > > > > > > can potentially still treat the consumer's offset as out of >> range >> > > and >> > > > > > reset >> > > > > > > the offset based on the offset reset policy in the consumer. >> This >> > > > seems >> > > > > > > harder to do with a global metadata version. >> > > > > > > >> > > > > > > Jun >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > On Mon, Dec 25, 2017 at 6:56 AM, Dong Lin < >> lindon...@gmail.com> >> > > > wrote: >> > > > > > > >> > > > > > > > Hey Jun, >> > > > > > > > >> > > > > > > > This is a very good example. After thinking through this in >> > > > detail, I >> > > > > > > agree >> > > > > > > > that we need to commit offset with leader epoch in order to >> > > address >> > > > > > this >> > > > > > > > example. >> > > > > > > > >> > > > > > > > I think the remaining question is how to address the >> scenario >> > > that >> > > > > the >> > > > > > > > topic is deleted and re-created. One possible solution is to >> > > commit >> > > > > > > offset >> > > > > > > > with both the leader epoch and the metadata version. The >> logic >> > > and >> > > > > the >> > > > > > > > implementation of this solution does not require a new >> concept >> > > > (e.g. >> > > > > > > > partition epoch) and it does not require any change to the >> > > message >> > > > > > format >> > > > > > > > or leader epoch. It also allows us to order the metadata in >> a >> > > > > > > > straightforward manner which may be useful in the future. >> So it >> > > may >> > > > > be >> > > > > > a >> > > > > > > > better solution than generating a random partition epoch >> every >> > > time >> > > > > we >> > > > > > > > create a partition. Does this sound reasonable? >> > > > > > > > >> > > > > > > > Previously one concern with using the metadata version is >> that >> > > > > consumer >> > > > > > > > will be forced to refresh metadata even if metadata version >> is >> > > > > > increased >> > > > > > > > due to topics that the consumer is not interested in. Now I >> > > > realized >> > > > > > that >> > > > > > > > this is probably not a problem. Currently client will >> refresh >> > > > > metadata >> > > > > > > > either due to InvalidMetadataException in the response from >> > > broker >> > > > or >> > > > > > due >> > > > > > > > to metadata expiry. The addition of the metadata version >> should >> > > > > > increase >> > > > > > > > the overhead of metadata refresh caused by >> > > > InvalidMetadataException. >> > > > > If >> > > > > > > > client refresh metadata due to expiry and it receives a >> > metadata >> > > > > whose >> > > > > > > > version is lower than the current metadata version, we can >> > reject >> > > > the >> > > > > > > > metadata but still reset the metadata age, which essentially >> > keep >> > > > the >> > > > > > > > existing behavior in the client. >> > > > > > > > >> > > > > > > > Thanks much, >> > > > > > > > Dong >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >