Hi, Dong,

Yes, that's what I am thinking. OffsetEpoch will be composed of
(partition_epoch,
leader_epoch).

Thanks,

Jun


On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jun,
>
> Thanks much. I like the the new API that you proposed. I am not sure what
> you exactly mean by offset_epoch. I suppose that we can use the pair of
> (partition_epoch, leader_epoch) as the offset_epoch, right?
>
> Thanks,
> Dong
>
> On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Dong,
> >
> > Got it. The api that you proposed works. The question is whether that's
> the
> > api that we want to have in the long term. My concern is that while the
> api
> > change is simple, the new api seems harder to explain and use. For
> example,
> > a consumer storing offsets externally now needs to call
> > waitForMetadataUpdate() after calling seek().
> >
> > An alternative approach is to make the following compatible api changes
> in
> > Consumer.
> > * Add an additional OffsetEpoch field in OffsetAndMetadata. (no need to
> > change the CommitSync() api)
> > * Add a new api seek(TopicPartition partition, long offset, OffsetEpoch
> > offsetEpoch). We can potentially deprecate the old api
> seek(TopicPartition
> > partition, long offset) in the future.
> >
> > The alternative approach has similar amount of api changes as yours but
> has
> > the following benefits.
> > 1. The api works in a similar way as how offset management works now and
> is
> > probably what we want in the long term.
> > 2. It can reset offsets better when there is data loss due to unclean
> > leader election or correlated replica failure.
> > 3. It can reset offsets better when topic is recreated.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Jun,
> > >
> > > Yeah I agree that ideally we don't want an ever growing global metadata
> > > version. I just think it may be more desirable to keep the consumer API
> > > simple.
> > >
> > > In my current proposal, metadata version returned in the fetch response
> > > will be stored with the offset together. More specifically, the
> > > metadata_epoch in the new offset topic schema will be the largest
> > > metadata_epoch from all the MetadataResponse and FetchResponse ever
> > > received by this consumer.
> > >
> > > We probably don't have to change the consumer API for
> > > commitSync(Map<TopicPartition, OffsetAndMetadata>). If user calls
> > > commitSync(...) to commit offset 10 for a given partition, for most
> > > use-cases, this consumer instance should have consumed message with
> > offset
> > > 9 from this partition, in which case the consumer can remember and use
> > the
> > > metadata_epoch from the corresponding FetchResponse when committing
> > offset.
> > > If user calls commitSync(..) to commit offset 10 for a given partition
> > > without having consumed the message with offset 9 using this consumer
> > > instance, this is probably an advanced use-case. In this case the
> > advanced
> > > user can retrieve the metadata_epoch using the newly added
> > metadataEpoch()
> > > API after it fetches the message with offset 9 (probably from another
> > > consumer instance) and encode this metadata_epoch in the
> > > string OffsetAndMetadata.metadata. Do you think this solution would
> work?
> > >
> > > By "not sure that I fully understand your latest suggestion", are you
> > > referring to solution related to unclean leader election using
> > leader_epoch
> > > in my previous email?
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao <j...@confluent.io> wrote:
> > >
> > > > Hi, Dong,
> > > >
> > > > Not sure that I fully understand your latest suggestion. Returning an
> > > ever
> > > > growing global metadata version itself is no ideal, but is fine. My
> > > > question is whether the metadata version returned in the fetch
> response
> > > > needs to be stored with the offset together if offsets are stored
> > > > externally. If so, we also have to change the consumer API for
> > > commitSync()
> > > > and need to worry about compatibility. If we don't store the metadata
> > > > version together with the offset, on a consumer restart, it's not
> clear
> > > how
> > > > we can ensure the metadata in the consumer is high enough since there
> > is
> > > no
> > > > metadata version to compare with.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> > > >
> > > > > Hey Jun,
> > > > >
> > > > > Thanks much for the explanation.
> > > > >
> > > > > I understand the advantage of partition_epoch over metadata_epoch.
> My
> > > > > current concern is that the use of leader_epoch and the
> > partition_epoch
> > > > > requires us considerable change to consumer's public API to take
> care
> > > of
> > > > > the case where user stores offset externally. For example,
> > *consumer*.
> > > > > *commitSync*(..) would have to take a map whose value is <offset,
> > > > metadata,
> > > > > leader epoch, partition epoch>. *consumer*.*seek*(...) would also
> > need
> > > > > leader_epoch and partition_epoch as parameter. Technically we can
> > > > probably
> > > > > still make it work in a backward compatible manner after careful
> > design
> > > > and
> > > > > discussion. But these changes can make the consumer's interface
> > > > > unnecessarily complex for more users who do not store offset
> > > externally.
> > > > >
> > > > > After thinking more about it, we can address all problems discussed
> > by
> > > > only
> > > > > using the metadata_epoch without introducing leader_epoch or the
> > > > > partition_epoch. The current KIP describes the changes to the
> > consumer
> > > > API
> > > > > and how the new API can be used if user stores offset externally.
> In
> > > > order
> > > > > to address the scenario you described earlier, we can include
> > > > > metadata_epoch in the FetchResponse and the LeaderAndIsrRequest.
> > > Consumer
> > > > > remembers the largest metadata_epoch from all the FetchResponse it
> > has
> > > > > received. The metadata_epoch committed with the offset, either
> within
> > > or
> > > > > outside Kafka, should be the largest metadata_epoch across all
> > > > > FetchResponse and MetadataResponse ever received by this consumer.
> > > > >
> > > > > The drawback of using only the metadata_epoch is that we can not
> > always
> > > > do
> > > > > the smart offset reset in case of unclean leader election which you
> > > > > mentioned earlier. But in most case, unclean leader election
> probably
> > > > > happens when consumer is not rebalancing/restarting. In these
> cases,
> > > > either
> > > > > consumer is not directly affected by unclean leader election since
> it
> > > is
> > > > > not consuming from the end of the log, or consumer can derive the
> > > > > leader_epoch from the most recent message received before it sees
> > > > > OffsetOutOfRangeException. So I am not sure it is worth adding the
> > > > > leader_epoch to consumer API to address the remaining corner case.
> > What
> > > > do
> > > > > you think?
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun Rao <j...@confluent.io> wrote:
> > > > >
> > > > > > Hi, Dong,
> > > > > >
> > > > > > Thanks for the reply.
> > > > > >
> > > > > > To solve the topic recreation issue, we could use either a global
> > > > > metadata
> > > > > > version or a partition level epoch. But either one will be a new
> > > > concept,
> > > > > > right? To me, the latter seems more natural. It also makes it
> > easier
> > > to
> > > > > > detect if a consumer's offset is still valid after a topic is
> > > > recreated.
> > > > > As
> > > > > > you pointed out, we don't need to store the partition epoch in
> the
> > > > > message.
> > > > > > The following is what I am thinking. When a partition is created,
> > we
> > > > can
> > > > > > assign a partition epoch from an ever-increasing global counter
> and
> > > > store
> > > > > > it in /brokers/topics/[topic]/partitions/[partitionId] in ZK.
> The
> > > > > > partition
> > > > > > epoch is propagated to every broker. The consumer will be
> tracking
> > a
> > > > > tuple
> > > > > > of <offset, leader epoch, partition epoch> for offsets. If a
> topic
> > is
> > > > > > recreated, it's possible that a consumer's offset and leader
> epoch
> > > > still
> > > > > > match that in the broker, but partition epoch won't be. In this
> > case,
> > > > we
> > > > > > can potentially still treat the consumer's offset as out of range
> > and
> > > > > reset
> > > > > > the offset based on the offset reset policy in the consumer. This
> > > seems
> > > > > > harder to do with a global metadata version.
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Dec 25, 2017 at 6:56 AM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hey Jun,
> > > > > > >
> > > > > > > This is a very good example. After thinking through this in
> > > detail, I
> > > > > > agree
> > > > > > > that we need to commit offset with leader epoch in order to
> > address
> > > > > this
> > > > > > > example.
> > > > > > >
> > > > > > > I think the remaining question is how to address the scenario
> > that
> > > > the
> > > > > > > topic is deleted and re-created. One possible solution is to
> > commit
> > > > > > offset
> > > > > > > with both the leader epoch and the metadata version. The logic
> > and
> > > > the
> > > > > > > implementation of this solution does not require a new concept
> > > (e.g.
> > > > > > > partition epoch) and it does not require any change to the
> > message
> > > > > format
> > > > > > > or leader epoch. It also allows us to order the metadata in a
> > > > > > > straightforward manner which may be useful in the future. So it
> > may
> > > > be
> > > > > a
> > > > > > > better solution than generating a random partition epoch every
> > time
> > > > we
> > > > > > > create a partition. Does this sound reasonable?
> > > > > > >
> > > > > > > Previously one concern with using the metadata version is that
> > > > consumer
> > > > > > > will be forced to refresh metadata even if metadata version is
> > > > > increased
> > > > > > > due to topics that the consumer is not interested in. Now I
> > > realized
> > > > > that
> > > > > > > this is probably not a problem. Currently client will refresh
> > > > metadata
> > > > > > > either due to InvalidMetadataException in the response from
> > broker
> > > or
> > > > > due
> > > > > > > to metadata expiry. The addition of the metadata version should
> > > > > increase
> > > > > > > the overhead of metadata refresh caused by
> > > InvalidMetadataException.
> > > > If
> > > > > > > client refresh metadata due to expiry and it receives a
> metadata
> > > > whose
> > > > > > > version is lower than the current metadata version, we can
> reject
> > > the
> > > > > > > metadata but still reset the metadata age, which essentially
> keep
> > > the
> > > > > > > existing behavior in the client.
> > > > > > >
> > > > > > > Thanks much,
> > > > > > > Dong
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to