Hi Dong,

Sorry for the late reply. I think the latest revision is looking good. I
have a few minor suggestions:

1. The name "partition_epoch" makes me think it changes independently at
the partition level, but all partitions for a topic should have the same
epoch. Maybe "topic_epoch" is nearer the mark?
2. Should we increment this epoch when a topic is deleted also? When the
broker returns an UNKNOWN_TOPIC_OR_PARTITION error in a metadata response,
we can also include the latest partition epoch, which would allow the
client to disambiguate the error if it has seen more recent metadata.
3. I am still wondering whether it is a good idea to expose these epochs in
the consumer API. As an alternative, have you considered representing the
data as an opaque blob of bytes? For example:

class OffsetAndMetadata {
  long offset;
  byte[] offsetMetadata;
  String metadata;
}

Admittedly, the naming is a bit annoying, but we can probably come up with
something better. Internally the byte array would have a version. If in the
future we have anything else we need to add, we can update the version and
we wouldn't need any new APIs.

The corresponding seek() and position() APIs might look something like this:

void seek(TopicPartition partition, long offset, byte[] offsetMetadata);
byte[] positionMetadata(TopicPartition partition);

What do you think?

Thanks,
Jason

On Thu, Jan 4, 2018 at 7:04 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jun, Jason,
>
> Thanks much for all the feedback. I have updated the KIP based on the
> latest discussion. Can you help check whether it looks good?
>
> Thanks,
> Dong
>
> On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Jun,
> >
> > Hmm... thinking about this more, I am not sure that the proposed API is
> > sufficient. For users that store offset externally, we probably need
> extra
> > API to return the leader_epoch and partition_epoch for all partitions
> that
> > consumers are consuming. I suppose these users currently use position()
> to
> > get the offset. Thus we probably need a new method positionWithEpoch(..)
> to
> > return <offset, partition_epoch, leader_epoch>. Does this sound
> reasonable?
> >
> > Thanks,
> > Dong
> >
> >
> > On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao <j...@confluent.io> wrote:
> >
> >> Hi, Dong,
> >>
> >> Yes, that's what I am thinking. OffsetEpoch will be composed of
> >> (partition_epoch,
> >> leader_epoch).
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin <lindon...@gmail.com> wrote:
> >>
> >> > Hey Jun,
> >> >
> >> > Thanks much. I like the the new API that you proposed. I am not sure
> >> what
> >> > you exactly mean by offset_epoch. I suppose that we can use the pair
> of
> >> > (partition_epoch, leader_epoch) as the offset_epoch, right?
> >> >
> >> > Thanks,
> >> > Dong
> >> >
> >> > On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao <j...@confluent.io> wrote:
> >> >
> >> > > Hi, Dong,
> >> > >
> >> > > Got it. The api that you proposed works. The question is whether
> >> that's
> >> > the
> >> > > api that we want to have in the long term. My concern is that while
> >> the
> >> > api
> >> > > change is simple, the new api seems harder to explain and use. For
> >> > example,
> >> > > a consumer storing offsets externally now needs to call
> >> > > waitForMetadataUpdate() after calling seek().
> >> > >
> >> > > An alternative approach is to make the following compatible api
> >> changes
> >> > in
> >> > > Consumer.
> >> > > * Add an additional OffsetEpoch field in OffsetAndMetadata. (no need
> >> to
> >> > > change the CommitSync() api)
> >> > > * Add a new api seek(TopicPartition partition, long offset,
> >> OffsetEpoch
> >> > > offsetEpoch). We can potentially deprecate the old api
> >> > seek(TopicPartition
> >> > > partition, long offset) in the future.
> >> > >
> >> > > The alternative approach has similar amount of api changes as yours
> >> but
> >> > has
> >> > > the following benefits.
> >> > > 1. The api works in a similar way as how offset management works now
> >> and
> >> > is
> >> > > probably what we want in the long term.
> >> > > 2. It can reset offsets better when there is data loss due to
> unclean
> >> > > leader election or correlated replica failure.
> >> > > 3. It can reset offsets better when topic is recreated.
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Jun
> >> > >
> >> > >
> >> > > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> >> > >
> >> > > > Hey Jun,
> >> > > >
> >> > > > Yeah I agree that ideally we don't want an ever growing global
> >> metadata
> >> > > > version. I just think it may be more desirable to keep the
> consumer
> >> API
> >> > > > simple.
> >> > > >
> >> > > > In my current proposal, metadata version returned in the fetch
> >> response
> >> > > > will be stored with the offset together. More specifically, the
> >> > > > metadata_epoch in the new offset topic schema will be the largest
> >> > > > metadata_epoch from all the MetadataResponse and FetchResponse
> ever
> >> > > > received by this consumer.
> >> > > >
> >> > > > We probably don't have to change the consumer API for
> >> > > > commitSync(Map<TopicPartition, OffsetAndMetadata>). If user calls
> >> > > > commitSync(...) to commit offset 10 for a given partition, for
> most
> >> > > > use-cases, this consumer instance should have consumed message
> with
> >> > > offset
> >> > > > 9 from this partition, in which case the consumer can remember and
> >> use
> >> > > the
> >> > > > metadata_epoch from the corresponding FetchResponse when
> committing
> >> > > offset.
> >> > > > If user calls commitSync(..) to commit offset 10 for a given
> >> partition
> >> > > > without having consumed the message with offset 9 using this
> >> consumer
> >> > > > instance, this is probably an advanced use-case. In this case the
> >> > > advanced
> >> > > > user can retrieve the metadata_epoch using the newly added
> >> > > metadataEpoch()
> >> > > > API after it fetches the message with offset 9 (probably from
> >> another
> >> > > > consumer instance) and encode this metadata_epoch in the
> >> > > > string OffsetAndMetadata.metadata. Do you think this solution
> would
> >> > work?
> >> > > >
> >> > > > By "not sure that I fully understand your latest suggestion", are
> >> you
> >> > > > referring to solution related to unclean leader election using
> >> > > leader_epoch
> >> > > > in my previous email?
> >> > > >
> >> > > > Thanks,
> >> > > > Dong
> >> > > >
> >> > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao <j...@confluent.io> wrote:
> >> > > >
> >> > > > > Hi, Dong,
> >> > > > >
> >> > > > > Not sure that I fully understand your latest suggestion.
> >> Returning an
> >> > > > ever
> >> > > > > growing global metadata version itself is no ideal, but is fine.
> >> My
> >> > > > > question is whether the metadata version returned in the fetch
> >> > response
> >> > > > > needs to be stored with the offset together if offsets are
> stored
> >> > > > > externally. If so, we also have to change the consumer API for
> >> > > > commitSync()
> >> > > > > and need to worry about compatibility. If we don't store the
> >> metadata
> >> > > > > version together with the offset, on a consumer restart, it's
> not
> >> > clear
> >> > > > how
> >> > > > > we can ensure the metadata in the consumer is high enough since
> >> there
> >> > > is
> >> > > > no
> >> > > > > metadata version to compare with.
> >> > > > >
> >> > > > > Thanks,
> >> > > > >
> >> > > > > Jun
> >> > > > >
> >> > > > >
> >> > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong Lin <lindon...@gmail.com>
> >> > wrote:
> >> > > > >
> >> > > > > > Hey Jun,
> >> > > > > >
> >> > > > > > Thanks much for the explanation.
> >> > > > > >
> >> > > > > > I understand the advantage of partition_epoch over
> >> metadata_epoch.
> >> > My
> >> > > > > > current concern is that the use of leader_epoch and the
> >> > > partition_epoch
> >> > > > > > requires us considerable change to consumer's public API to
> take
> >> > care
> >> > > > of
> >> > > > > > the case where user stores offset externally. For example,
> >> > > *consumer*.
> >> > > > > > *commitSync*(..) would have to take a map whose value is
> >> <offset,
> >> > > > > metadata,
> >> > > > > > leader epoch, partition epoch>. *consumer*.*seek*(...) would
> >> also
> >> > > need
> >> > > > > > leader_epoch and partition_epoch as parameter. Technically we
> >> can
> >> > > > > probably
> >> > > > > > still make it work in a backward compatible manner after
> careful
> >> > > design
> >> > > > > and
> >> > > > > > discussion. But these changes can make the consumer's
> interface
> >> > > > > > unnecessarily complex for more users who do not store offset
> >> > > > externally.
> >> > > > > >
> >> > > > > > After thinking more about it, we can address all problems
> >> discussed
> >> > > by
> >> > > > > only
> >> > > > > > using the metadata_epoch without introducing leader_epoch or
> the
> >> > > > > > partition_epoch. The current KIP describes the changes to the
> >> > > consumer
> >> > > > > API
> >> > > > > > and how the new API can be used if user stores offset
> >> externally.
> >> > In
> >> > > > > order
> >> > > > > > to address the scenario you described earlier, we can include
> >> > > > > > metadata_epoch in the FetchResponse and the
> LeaderAndIsrRequest.
> >> > > > Consumer
> >> > > > > > remembers the largest metadata_epoch from all the
> FetchResponse
> >> it
> >> > > has
> >> > > > > > received. The metadata_epoch committed with the offset, either
> >> > within
> >> > > > or
> >> > > > > > outside Kafka, should be the largest metadata_epoch across all
> >> > > > > > FetchResponse and MetadataResponse ever received by this
> >> consumer.
> >> > > > > >
> >> > > > > > The drawback of using only the metadata_epoch is that we can
> not
> >> > > always
> >> > > > > do
> >> > > > > > the smart offset reset in case of unclean leader election
> which
> >> you
> >> > > > > > mentioned earlier. But in most case, unclean leader election
> >> > probably
> >> > > > > > happens when consumer is not rebalancing/restarting. In these
> >> > cases,
> >> > > > > either
> >> > > > > > consumer is not directly affected by unclean leader election
> >> since
> >> > it
> >> > > > is
> >> > > > > > not consuming from the end of the log, or consumer can derive
> >> the
> >> > > > > > leader_epoch from the most recent message received before it
> >> sees
> >> > > > > > OffsetOutOfRangeException. So I am not sure it is worth adding
> >> the
> >> > > > > > leader_epoch to consumer API to address the remaining corner
> >> case.
> >> > > What
> >> > > > > do
> >> > > > > > you think?
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Dong
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun Rao <j...@confluent.io>
> >> wrote:
> >> > > > > >
> >> > > > > > > Hi, Dong,
> >> > > > > > >
> >> > > > > > > Thanks for the reply.
> >> > > > > > >
> >> > > > > > > To solve the topic recreation issue, we could use either a
> >> global
> >> > > > > > metadata
> >> > > > > > > version or a partition level epoch. But either one will be a
> >> new
> >> > > > > concept,
> >> > > > > > > right? To me, the latter seems more natural. It also makes
> it
> >> > > easier
> >> > > > to
> >> > > > > > > detect if a consumer's offset is still valid after a topic
> is
> >> > > > > recreated.
> >> > > > > > As
> >> > > > > > > you pointed out, we don't need to store the partition epoch
> in
> >> > the
> >> > > > > > message.
> >> > > > > > > The following is what I am thinking. When a partition is
> >> created,
> >> > > we
> >> > > > > can
> >> > > > > > > assign a partition epoch from an ever-increasing global
> >> counter
> >> > and
> >> > > > > store
> >> > > > > > > it in /brokers/topics/[topic]/partitions/[partitionId] in
> ZK.
> >> > The
> >> > > > > > > partition
> >> > > > > > > epoch is propagated to every broker. The consumer will be
> >> > tracking
> >> > > a
> >> > > > > > tuple
> >> > > > > > > of <offset, leader epoch, partition epoch> for offsets. If a
> >> > topic
> >> > > is
> >> > > > > > > recreated, it's possible that a consumer's offset and leader
> >> > epoch
> >> > > > > still
> >> > > > > > > match that in the broker, but partition epoch won't be. In
> >> this
> >> > > case,
> >> > > > > we
> >> > > > > > > can potentially still treat the consumer's offset as out of
> >> range
> >> > > and
> >> > > > > > reset
> >> > > > > > > the offset based on the offset reset policy in the consumer.
> >> This
> >> > > > seems
> >> > > > > > > harder to do with a global metadata version.
> >> > > > > > >
> >> > > > > > > Jun
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Mon, Dec 25, 2017 at 6:56 AM, Dong Lin <
> >> lindon...@gmail.com>
> >> > > > wrote:
> >> > > > > > >
> >> > > > > > > > Hey Jun,
> >> > > > > > > >
> >> > > > > > > > This is a very good example. After thinking through this
> in
> >> > > > detail, I
> >> > > > > > > agree
> >> > > > > > > > that we need to commit offset with leader epoch in order
> to
> >> > > address
> >> > > > > > this
> >> > > > > > > > example.
> >> > > > > > > >
> >> > > > > > > > I think the remaining question is how to address the
> >> scenario
> >> > > that
> >> > > > > the
> >> > > > > > > > topic is deleted and re-created. One possible solution is
> to
> >> > > commit
> >> > > > > > > offset
> >> > > > > > > > with both the leader epoch and the metadata version. The
> >> logic
> >> > > and
> >> > > > > the
> >> > > > > > > > implementation of this solution does not require a new
> >> concept
> >> > > > (e.g.
> >> > > > > > > > partition epoch) and it does not require any change to the
> >> > > message
> >> > > > > > format
> >> > > > > > > > or leader epoch. It also allows us to order the metadata
> in
> >> a
> >> > > > > > > > straightforward manner which may be useful in the future.
> >> So it
> >> > > may
> >> > > > > be
> >> > > > > > a
> >> > > > > > > > better solution than generating a random partition epoch
> >> every
> >> > > time
> >> > > > > we
> >> > > > > > > > create a partition. Does this sound reasonable?
> >> > > > > > > >
> >> > > > > > > > Previously one concern with using the metadata version is
> >> that
> >> > > > > consumer
> >> > > > > > > > will be forced to refresh metadata even if metadata
> version
> >> is
> >> > > > > > increased
> >> > > > > > > > due to topics that the consumer is not interested in. Now
> I
> >> > > > realized
> >> > > > > > that
> >> > > > > > > > this is probably not a problem. Currently client will
> >> refresh
> >> > > > > metadata
> >> > > > > > > > either due to InvalidMetadataException in the response
> from
> >> > > broker
> >> > > > or
> >> > > > > > due
> >> > > > > > > > to metadata expiry. The addition of the metadata version
> >> should
> >> > > > > > increase
> >> > > > > > > > the overhead of metadata refresh caused by
> >> > > > InvalidMetadataException.
> >> > > > > If
> >> > > > > > > > client refresh metadata due to expiry and it receives a
> >> > metadata
> >> > > > > whose
> >> > > > > > > > version is lower than the current metadata version, we can
> >> > reject
> >> > > > the
> >> > > > > > > > metadata but still reset the metadata age, which
> essentially
> >> > keep
> >> > > > the
> >> > > > > > > > existing behavior in the client.
> >> > > > > > > >
> >> > > > > > > > Thanks much,
> >> > > > > > > > Dong
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Reply via email to