Hey Jun, Jason,

Thanks much for all the feedback. I have updated the KIP based on the
latest discussion. Can you help check whether it looks good?

Thanks,
Dong

On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jun,
>
> Hmm... thinking about this more, I am not sure that the proposed API is
> sufficient. For users that store offset externally, we probably need extra
> API to return the leader_epoch and partition_epoch for all partitions that
> consumers are consuming. I suppose these users currently use position() to
> get the offset. Thus we probably need a new method positionWithEpoch(..) to
> return <offset, partition_epoch, leader_epoch>. Does this sound reasonable?
>
> Thanks,
> Dong
>
>
> On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao <j...@confluent.io> wrote:
>
>> Hi, Dong,
>>
>> Yes, that's what I am thinking. OffsetEpoch will be composed of
>> (partition_epoch,
>> leader_epoch).
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin <lindon...@gmail.com> wrote:
>>
>> > Hey Jun,
>> >
>> > Thanks much. I like the the new API that you proposed. I am not sure
>> what
>> > you exactly mean by offset_epoch. I suppose that we can use the pair of
>> > (partition_epoch, leader_epoch) as the offset_epoch, right?
>> >
>> > Thanks,
>> > Dong
>> >
>> > On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao <j...@confluent.io> wrote:
>> >
>> > > Hi, Dong,
>> > >
>> > > Got it. The api that you proposed works. The question is whether
>> that's
>> > the
>> > > api that we want to have in the long term. My concern is that while
>> the
>> > api
>> > > change is simple, the new api seems harder to explain and use. For
>> > example,
>> > > a consumer storing offsets externally now needs to call
>> > > waitForMetadataUpdate() after calling seek().
>> > >
>> > > An alternative approach is to make the following compatible api
>> changes
>> > in
>> > > Consumer.
>> > > * Add an additional OffsetEpoch field in OffsetAndMetadata. (no need
>> to
>> > > change the CommitSync() api)
>> > > * Add a new api seek(TopicPartition partition, long offset,
>> OffsetEpoch
>> > > offsetEpoch). We can potentially deprecate the old api
>> > seek(TopicPartition
>> > > partition, long offset) in the future.
>> > >
>> > > The alternative approach has similar amount of api changes as yours
>> but
>> > has
>> > > the following benefits.
>> > > 1. The api works in a similar way as how offset management works now
>> and
>> > is
>> > > probably what we want in the long term.
>> > > 2. It can reset offsets better when there is data loss due to unclean
>> > > leader election or correlated replica failure.
>> > > 3. It can reset offsets better when topic is recreated.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin <lindon...@gmail.com> wrote:
>> > >
>> > > > Hey Jun,
>> > > >
>> > > > Yeah I agree that ideally we don't want an ever growing global
>> metadata
>> > > > version. I just think it may be more desirable to keep the consumer
>> API
>> > > > simple.
>> > > >
>> > > > In my current proposal, metadata version returned in the fetch
>> response
>> > > > will be stored with the offset together. More specifically, the
>> > > > metadata_epoch in the new offset topic schema will be the largest
>> > > > metadata_epoch from all the MetadataResponse and FetchResponse ever
>> > > > received by this consumer.
>> > > >
>> > > > We probably don't have to change the consumer API for
>> > > > commitSync(Map<TopicPartition, OffsetAndMetadata>). If user calls
>> > > > commitSync(...) to commit offset 10 for a given partition, for most
>> > > > use-cases, this consumer instance should have consumed message with
>> > > offset
>> > > > 9 from this partition, in which case the consumer can remember and
>> use
>> > > the
>> > > > metadata_epoch from the corresponding FetchResponse when committing
>> > > offset.
>> > > > If user calls commitSync(..) to commit offset 10 for a given
>> partition
>> > > > without having consumed the message with offset 9 using this
>> consumer
>> > > > instance, this is probably an advanced use-case. In this case the
>> > > advanced
>> > > > user can retrieve the metadata_epoch using the newly added
>> > > metadataEpoch()
>> > > > API after it fetches the message with offset 9 (probably from
>> another
>> > > > consumer instance) and encode this metadata_epoch in the
>> > > > string OffsetAndMetadata.metadata. Do you think this solution would
>> > work?
>> > > >
>> > > > By "not sure that I fully understand your latest suggestion", are
>> you
>> > > > referring to solution related to unclean leader election using
>> > > leader_epoch
>> > > > in my previous email?
>> > > >
>> > > > Thanks,
>> > > > Dong
>> > > >
>> > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao <j...@confluent.io> wrote:
>> > > >
>> > > > > Hi, Dong,
>> > > > >
>> > > > > Not sure that I fully understand your latest suggestion.
>> Returning an
>> > > > ever
>> > > > > growing global metadata version itself is no ideal, but is fine.
>> My
>> > > > > question is whether the metadata version returned in the fetch
>> > response
>> > > > > needs to be stored with the offset together if offsets are stored
>> > > > > externally. If so, we also have to change the consumer API for
>> > > > commitSync()
>> > > > > and need to worry about compatibility. If we don't store the
>> metadata
>> > > > > version together with the offset, on a consumer restart, it's not
>> > clear
>> > > > how
>> > > > > we can ensure the metadata in the consumer is high enough since
>> there
>> > > is
>> > > > no
>> > > > > metadata version to compare with.
>> > > > >
>> > > > > Thanks,
>> > > > >
>> > > > > Jun
>> > > > >
>> > > > >
>> > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong Lin <lindon...@gmail.com>
>> > wrote:
>> > > > >
>> > > > > > Hey Jun,
>> > > > > >
>> > > > > > Thanks much for the explanation.
>> > > > > >
>> > > > > > I understand the advantage of partition_epoch over
>> metadata_epoch.
>> > My
>> > > > > > current concern is that the use of leader_epoch and the
>> > > partition_epoch
>> > > > > > requires us considerable change to consumer's public API to take
>> > care
>> > > > of
>> > > > > > the case where user stores offset externally. For example,
>> > > *consumer*.
>> > > > > > *commitSync*(..) would have to take a map whose value is
>> <offset,
>> > > > > metadata,
>> > > > > > leader epoch, partition epoch>. *consumer*.*seek*(...) would
>> also
>> > > need
>> > > > > > leader_epoch and partition_epoch as parameter. Technically we
>> can
>> > > > > probably
>> > > > > > still make it work in a backward compatible manner after careful
>> > > design
>> > > > > and
>> > > > > > discussion. But these changes can make the consumer's interface
>> > > > > > unnecessarily complex for more users who do not store offset
>> > > > externally.
>> > > > > >
>> > > > > > After thinking more about it, we can address all problems
>> discussed
>> > > by
>> > > > > only
>> > > > > > using the metadata_epoch without introducing leader_epoch or the
>> > > > > > partition_epoch. The current KIP describes the changes to the
>> > > consumer
>> > > > > API
>> > > > > > and how the new API can be used if user stores offset
>> externally.
>> > In
>> > > > > order
>> > > > > > to address the scenario you described earlier, we can include
>> > > > > > metadata_epoch in the FetchResponse and the LeaderAndIsrRequest.
>> > > > Consumer
>> > > > > > remembers the largest metadata_epoch from all the FetchResponse
>> it
>> > > has
>> > > > > > received. The metadata_epoch committed with the offset, either
>> > within
>> > > > or
>> > > > > > outside Kafka, should be the largest metadata_epoch across all
>> > > > > > FetchResponse and MetadataResponse ever received by this
>> consumer.
>> > > > > >
>> > > > > > The drawback of using only the metadata_epoch is that we can not
>> > > always
>> > > > > do
>> > > > > > the smart offset reset in case of unclean leader election which
>> you
>> > > > > > mentioned earlier. But in most case, unclean leader election
>> > probably
>> > > > > > happens when consumer is not rebalancing/restarting. In these
>> > cases,
>> > > > > either
>> > > > > > consumer is not directly affected by unclean leader election
>> since
>> > it
>> > > > is
>> > > > > > not consuming from the end of the log, or consumer can derive
>> the
>> > > > > > leader_epoch from the most recent message received before it
>> sees
>> > > > > > OffsetOutOfRangeException. So I am not sure it is worth adding
>> the
>> > > > > > leader_epoch to consumer API to address the remaining corner
>> case.
>> > > What
>> > > > > do
>> > > > > > you think?
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Dong
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun Rao <j...@confluent.io>
>> wrote:
>> > > > > >
>> > > > > > > Hi, Dong,
>> > > > > > >
>> > > > > > > Thanks for the reply.
>> > > > > > >
>> > > > > > > To solve the topic recreation issue, we could use either a
>> global
>> > > > > > metadata
>> > > > > > > version or a partition level epoch. But either one will be a
>> new
>> > > > > concept,
>> > > > > > > right? To me, the latter seems more natural. It also makes it
>> > > easier
>> > > > to
>> > > > > > > detect if a consumer's offset is still valid after a topic is
>> > > > > recreated.
>> > > > > > As
>> > > > > > > you pointed out, we don't need to store the partition epoch in
>> > the
>> > > > > > message.
>> > > > > > > The following is what I am thinking. When a partition is
>> created,
>> > > we
>> > > > > can
>> > > > > > > assign a partition epoch from an ever-increasing global
>> counter
>> > and
>> > > > > store
>> > > > > > > it in /brokers/topics/[topic]/partitions/[partitionId] in ZK.
>> > The
>> > > > > > > partition
>> > > > > > > epoch is propagated to every broker. The consumer will be
>> > tracking
>> > > a
>> > > > > > tuple
>> > > > > > > of <offset, leader epoch, partition epoch> for offsets. If a
>> > topic
>> > > is
>> > > > > > > recreated, it's possible that a consumer's offset and leader
>> > epoch
>> > > > > still
>> > > > > > > match that in the broker, but partition epoch won't be. In
>> this
>> > > case,
>> > > > > we
>> > > > > > > can potentially still treat the consumer's offset as out of
>> range
>> > > and
>> > > > > > reset
>> > > > > > > the offset based on the offset reset policy in the consumer.
>> This
>> > > > seems
>> > > > > > > harder to do with a global metadata version.
>> > > > > > >
>> > > > > > > Jun
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > On Mon, Dec 25, 2017 at 6:56 AM, Dong Lin <
>> lindon...@gmail.com>
>> > > > wrote:
>> > > > > > >
>> > > > > > > > Hey Jun,
>> > > > > > > >
>> > > > > > > > This is a very good example. After thinking through this in
>> > > > detail, I
>> > > > > > > agree
>> > > > > > > > that we need to commit offset with leader epoch in order to
>> > > address
>> > > > > > this
>> > > > > > > > example.
>> > > > > > > >
>> > > > > > > > I think the remaining question is how to address the
>> scenario
>> > > that
>> > > > > the
>> > > > > > > > topic is deleted and re-created. One possible solution is to
>> > > commit
>> > > > > > > offset
>> > > > > > > > with both the leader epoch and the metadata version. The
>> logic
>> > > and
>> > > > > the
>> > > > > > > > implementation of this solution does not require a new
>> concept
>> > > > (e.g.
>> > > > > > > > partition epoch) and it does not require any change to the
>> > > message
>> > > > > > format
>> > > > > > > > or leader epoch. It also allows us to order the metadata in
>> a
>> > > > > > > > straightforward manner which may be useful in the future.
>> So it
>> > > may
>> > > > > be
>> > > > > > a
>> > > > > > > > better solution than generating a random partition epoch
>> every
>> > > time
>> > > > > we
>> > > > > > > > create a partition. Does this sound reasonable?
>> > > > > > > >
>> > > > > > > > Previously one concern with using the metadata version is
>> that
>> > > > > consumer
>> > > > > > > > will be forced to refresh metadata even if metadata version
>> is
>> > > > > > increased
>> > > > > > > > due to topics that the consumer is not interested in. Now I
>> > > > realized
>> > > > > > that
>> > > > > > > > this is probably not a problem. Currently client will
>> refresh
>> > > > > metadata
>> > > > > > > > either due to InvalidMetadataException in the response from
>> > > broker
>> > > > or
>> > > > > > due
>> > > > > > > > to metadata expiry. The addition of the metadata version
>> should
>> > > > > > increase
>> > > > > > > > the overhead of metadata refresh caused by
>> > > > InvalidMetadataException.
>> > > > > If
>> > > > > > > > client refresh metadata due to expiry and it receives a
>> > metadata
>> > > > > whose
>> > > > > > > > version is lower than the current metadata version, we can
>> > reject
>> > > > the
>> > > > > > > > metadata but still reset the metadata age, which essentially
>> > keep
>> > > > the
>> > > > > > > > existing behavior in the client.
>> > > > > > > >
>> > > > > > > > Thanks much,
>> > > > > > > > Dong
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to