Hi, Dong,

Thanks for the updated KIP. A few more comments.

60. Perhaps having a partition epoch is more flexible since in the future,
we may support deleting a partition as well.

61. It seems that the leader epoch returned in the position() call should
the the leader epoch returned in the fetch response, not the one in the
metadata cache of the client.

62. I am wondering if we should return the partition epoch in the fetch
response as well. In the current proposal, if a topic is recreated and the
new leader is on the same broker as the old one, there is nothing to force
the metadata refresh in the client. So, the client may still associate the
offset with the old partition epoch.

63. There is some subtle coordination between the LeaderAndIsrRequest and
UpdateMetadataRequest. Currently, when a leader changes, the controller
first sends the LeaderAndIsrRequest to the assigned replicas and the
UpdateMetadataRequest to every broker. So, there could be a small window
when the leader already receives the new partition epoch in the
LeaderAndIsrRequest, but the metadata cache in the broker hasn't been
updated with the latest partition epoch. Not sure what's the best way to
address this issue. Perhaps we can update the metadata cache on the broker
with both LeaderAndIsrRequest and UpdateMetadataRequest. The challenge is
that the two have slightly different data. For example, only the latter has
all endpoints.

64. The enforcement of leader epoch in Offset commit: We allow a consumer
to set an arbitrary offset. So it's possible for offsets or leader epoch to
go backwards. I am not sure if we could always enforce that the leader
epoch only goes up on the broker.

65. Good point on handling missing partition epoch due to topic deletion.
Another potential way to address this is to additionally propagate the
global partition epoch to brokers and the clients. This way, when a
partition epoch is missing, we can use the global partition epoch to reason
about which metadata is more recent.

66. A client may also get an offset by time using the offsetForTimes() api.
So, we probably want to include offsetInternalMetadata in OffsetAndTimestamp
as well.

67. InteralMetadata can be a bit confusing with the metadata field already
there. Perhaps we can just call it OffsetEpoch. It might be useful to make
OffsetEpoch printable at least for debugging purpose. Once you do that, we
are already exposing the internal fields. So, not sure if it's worth hiding
them. If we do want to hide them, perhaps we can have sth like the
following. The binary encoding is probably more efficient than JSON for
external storage.

OffsetEpoch {
 static OffsetEpoch decode(byte[]);

  public byte[] encode();

  public String toString();
}

Jun

On Mon, Jan 8, 2018 at 4:22 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jason,
>
> Certainly. This sounds good. I have updated the KIP to clarity that the
> global epoch will be incremented by 1 each time a topic is deleted.
>
> Thanks,
> Dong
>
> On Mon, Jan 8, 2018 at 4:09 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Hi Dong,
> >
> >
> > I think your approach will allow user to distinguish between the metadata
> > > before and after the topic deletion. I also agree that this can be
> > > potentially be useful to user. I am just not very sure whether we
> already
> > > have a good use-case to make the additional complexity worthwhile. It
> > seems
> > > that this feature is kind of independent of the main problem of this
> KIP.
> > > Could we add this as a future work?
> >
> >
> > Do you think it's fair if we bump the topic epoch on deletion and leave
> > propagation of the epoch for deleted topics for future work? I don't
> think
> > this adds much complexity and it makes the behavior consistent: every
> topic
> > mutation results in an epoch bump.
> >
> > Thanks,
> > Jason
> >
> > On Mon, Jan 8, 2018 at 3:14 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Ismael,
> > >
> > > I guess we actually need user to see this field so that user can store
> > this
> > > value in the external store together with the offset. We just prefer
> the
> > > value to be opaque to discourage most users from interpreting this
> value.
> > > One more advantage of using such an opaque field is to be able to
> evolve
> > > the information (or schema) of this value without changing consumer API
> > in
> > > the future.
> > >
> > > I also thinking it is probably OK for user to be able to interpret this
> > > value, particularly for those advanced users.
> > >
> > > Thanks,
> > > Dong
> > >
> > > On Mon, Jan 8, 2018 at 2:34 PM, Ismael Juma <ism...@juma.me.uk> wrote:
> > >
> > > > On Fri, Jan 5, 2018 at 7:15 PM, Jason Gustafson <ja...@confluent.io>
> > > > wrote:
> > > > >
> > > > > class OffsetAndMetadata {
> > > > >   long offset;
> > > > >   byte[] offsetMetadata;
> > > > >   String metadata;
> > > > > }
> > > >
> > > >
> > > > > Admittedly, the naming is a bit annoying, but we can probably come
> up
> > > > with
> > > > > something better. Internally the byte array would have a version.
> If
> > in
> > > > the
> > > > > future we have anything else we need to add, we can update the
> > version
> > > > and
> > > > > we wouldn't need any new APIs.
> > > > >
> > > >
> > > > We can also add fields to a class in a compatible way. So, it seems
> to
> > me
> > > > that the main advantage of the byte array is that it's opaque to the
> > > user.
> > > > Is that correct? If so, we could also add any opaque metadata in a
> > > subclass
> > > > so that users don't even see it (unless they cast it, but then
> they're
> > on
> > > > their own).
> > > >
> > > > Ismael
> > > >
> > > > The corresponding seek() and position() APIs might look something
> like
> > > > this:
> > > > >
> > > > > void seek(TopicPartition partition, long offset, byte[]
> > > offsetMetadata);
> > > > > byte[] positionMetadata(TopicPartition partition);
> > > > >
> > > > > What do you think?
> > > > >
> > > > > Thanks,
> > > > > Jason
> > > > >
> > > > > On Thu, Jan 4, 2018 at 7:04 PM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hey Jun, Jason,
> > > > > >
> > > > > > Thanks much for all the feedback. I have updated the KIP based on
> > the
> > > > > > latest discussion. Can you help check whether it looks good?
> > > > > >
> > > > > > Thanks,
> > > > > > Dong
> > > > > >
> > > > > > On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hey Jun,
> > > > > > >
> > > > > > > Hmm... thinking about this more, I am not sure that the
> proposed
> > > API
> > > > is
> > > > > > > sufficient. For users that store offset externally, we probably
> > > need
> > > > > > extra
> > > > > > > API to return the leader_epoch and partition_epoch for all
> > > partitions
> > > > > > that
> > > > > > > consumers are consuming. I suppose these users currently use
> > > > position()
> > > > > > to
> > > > > > > get the offset. Thus we probably need a new method
> > > > > positionWithEpoch(..)
> > > > > > to
> > > > > > > return <offset, partition_epoch, leader_epoch>. Does this sound
> > > > > > reasonable?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Dong
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao <j...@confluent.io>
> > wrote:
> > > > > > >
> > > > > > >> Hi, Dong,
> > > > > > >>
> > > > > > >> Yes, that's what I am thinking. OffsetEpoch will be composed
> of
> > > > > > >> (partition_epoch,
> > > > > > >> leader_epoch).
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >>
> > > > > > >> Jun
> > > > > > >>
> > > > > > >>
> > > > > > >> On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin <lindon...@gmail.com
> >
> > > > wrote:
> > > > > > >>
> > > > > > >> > Hey Jun,
> > > > > > >> >
> > > > > > >> > Thanks much. I like the the new API that you proposed. I am
> > not
> > > > sure
> > > > > > >> what
> > > > > > >> > you exactly mean by offset_epoch. I suppose that we can use
> > the
> > > > pair
> > > > > > of
> > > > > > >> > (partition_epoch, leader_epoch) as the offset_epoch, right?
> > > > > > >> >
> > > > > > >> > Thanks,
> > > > > > >> > Dong
> > > > > > >> >
> > > > > > >> > On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao <j...@confluent.io>
> > > wrote:
> > > > > > >> >
> > > > > > >> > > Hi, Dong,
> > > > > > >> > >
> > > > > > >> > > Got it. The api that you proposed works. The question is
> > > whether
> > > > > > >> that's
> > > > > > >> > the
> > > > > > >> > > api that we want to have in the long term. My concern is
> > that
> > > > > while
> > > > > > >> the
> > > > > > >> > api
> > > > > > >> > > change is simple, the new api seems harder to explain and
> > use.
> > > > For
> > > > > > >> > example,
> > > > > > >> > > a consumer storing offsets externally now needs to call
> > > > > > >> > > waitForMetadataUpdate() after calling seek().
> > > > > > >> > >
> > > > > > >> > > An alternative approach is to make the following
> compatible
> > > api
> > > > > > >> changes
> > > > > > >> > in
> > > > > > >> > > Consumer.
> > > > > > >> > > * Add an additional OffsetEpoch field in
> OffsetAndMetadata.
> > > (no
> > > > > need
> > > > > > >> to
> > > > > > >> > > change the CommitSync() api)
> > > > > > >> > > * Add a new api seek(TopicPartition partition, long
> offset,
> > > > > > >> OffsetEpoch
> > > > > > >> > > offsetEpoch). We can potentially deprecate the old api
> > > > > > >> > seek(TopicPartition
> > > > > > >> > > partition, long offset) in the future.
> > > > > > >> > >
> > > > > > >> > > The alternative approach has similar amount of api changes
> > as
> > > > > yours
> > > > > > >> but
> > > > > > >> > has
> > > > > > >> > > the following benefits.
> > > > > > >> > > 1. The api works in a similar way as how offset management
> > > works
> > > > > now
> > > > > > >> and
> > > > > > >> > is
> > > > > > >> > > probably what we want in the long term.
> > > > > > >> > > 2. It can reset offsets better when there is data loss due
> > to
> > > > > > unclean
> > > > > > >> > > leader election or correlated replica failure.
> > > > > > >> > > 3. It can reset offsets better when topic is recreated.
> > > > > > >> > >
> > > > > > >> > > Thanks,
> > > > > > >> > >
> > > > > > >> > > Jun
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin <
> > lindon...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >> > >
> > > > > > >> > > > Hey Jun,
> > > > > > >> > > >
> > > > > > >> > > > Yeah I agree that ideally we don't want an ever growing
> > > global
> > > > > > >> metadata
> > > > > > >> > > > version. I just think it may be more desirable to keep
> the
> > > > > > consumer
> > > > > > >> API
> > > > > > >> > > > simple.
> > > > > > >> > > >
> > > > > > >> > > > In my current proposal, metadata version returned in the
> > > fetch
> > > > > > >> response
> > > > > > >> > > > will be stored with the offset together. More
> > specifically,
> > > > the
> > > > > > >> > > > metadata_epoch in the new offset topic schema will be
> the
> > > > > largest
> > > > > > >> > > > metadata_epoch from all the MetadataResponse and
> > > FetchResponse
> > > > > > ever
> > > > > > >> > > > received by this consumer.
> > > > > > >> > > >
> > > > > > >> > > > We probably don't have to change the consumer API for
> > > > > > >> > > > commitSync(Map<TopicPartition, OffsetAndMetadata>). If
> > user
> > > > > calls
> > > > > > >> > > > commitSync(...) to commit offset 10 for a given
> partition,
> > > for
> > > > > > most
> > > > > > >> > > > use-cases, this consumer instance should have consumed
> > > message
> > > > > > with
> > > > > > >> > > offset
> > > > > > >> > > > 9 from this partition, in which case the consumer can
> > > remember
> > > > > and
> > > > > > >> use
> > > > > > >> > > the
> > > > > > >> > > > metadata_epoch from the corresponding FetchResponse when
> > > > > > committing
> > > > > > >> > > offset.
> > > > > > >> > > > If user calls commitSync(..) to commit offset 10 for a
> > given
> > > > > > >> partition
> > > > > > >> > > > without having consumed the message with offset 9 using
> > this
> > > > > > >> consumer
> > > > > > >> > > > instance, this is probably an advanced use-case. In this
> > > case
> > > > > the
> > > > > > >> > > advanced
> > > > > > >> > > > user can retrieve the metadata_epoch using the newly
> added
> > > > > > >> > > metadataEpoch()
> > > > > > >> > > > API after it fetches the message with offset 9 (probably
> > > from
> > > > > > >> another
> > > > > > >> > > > consumer instance) and encode this metadata_epoch in the
> > > > > > >> > > > string OffsetAndMetadata.metadata. Do you think this
> > > solution
> > > > > > would
> > > > > > >> > work?
> > > > > > >> > > >
> > > > > > >> > > > By "not sure that I fully understand your latest
> > > suggestion",
> > > > > are
> > > > > > >> you
> > > > > > >> > > > referring to solution related to unclean leader election
> > > using
> > > > > > >> > > leader_epoch
> > > > > > >> > > > in my previous email?
> > > > > > >> > > >
> > > > > > >> > > > Thanks,
> > > > > > >> > > > Dong
> > > > > > >> > > >
> > > > > > >> > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao <
> j...@confluent.io
> > >
> > > > > wrote:
> > > > > > >> > > >
> > > > > > >> > > > > Hi, Dong,
> > > > > > >> > > > >
> > > > > > >> > > > > Not sure that I fully understand your latest
> suggestion.
> > > > > > >> Returning an
> > > > > > >> > > > ever
> > > > > > >> > > > > growing global metadata version itself is no ideal,
> but
> > is
> > > > > fine.
> > > > > > >> My
> > > > > > >> > > > > question is whether the metadata version returned in
> the
> > > > fetch
> > > > > > >> > response
> > > > > > >> > > > > needs to be stored with the offset together if offsets
> > are
> > > > > > stored
> > > > > > >> > > > > externally. If so, we also have to change the consumer
> > API
> > > > for
> > > > > > >> > > > commitSync()
> > > > > > >> > > > > and need to worry about compatibility. If we don't
> store
> > > the
> > > > > > >> metadata
> > > > > > >> > > > > version together with the offset, on a consumer
> restart,
> > > > it's
> > > > > > not
> > > > > > >> > clear
> > > > > > >> > > > how
> > > > > > >> > > > > we can ensure the metadata in the consumer is high
> > enough
> > > > > since
> > > > > > >> there
> > > > > > >> > > is
> > > > > > >> > > > no
> > > > > > >> > > > > metadata version to compare with.
> > > > > > >> > > > >
> > > > > > >> > > > > Thanks,
> > > > > > >> > > > >
> > > > > > >> > > > > Jun
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong Lin <
> > > > lindon...@gmail.com
> > > > > >
> > > > > > >> > wrote:
> > > > > > >> > > > >
> > > > > > >> > > > > > Hey Jun,
> > > > > > >> > > > > >
> > > > > > >> > > > > > Thanks much for the explanation.
> > > > > > >> > > > > >
> > > > > > >> > > > > > I understand the advantage of partition_epoch over
> > > > > > >> metadata_epoch.
> > > > > > >> > My
> > > > > > >> > > > > > current concern is that the use of leader_epoch and
> > the
> > > > > > >> > > partition_epoch
> > > > > > >> > > > > > requires us considerable change to consumer's public
> > API
> > > > to
> > > > > > take
> > > > > > >> > care
> > > > > > >> > > > of
> > > > > > >> > > > > > the case where user stores offset externally. For
> > > example,
> > > > > > >> > > *consumer*.
> > > > > > >> > > > > > *commitSync*(..) would have to take a map whose
> value
> > is
> > > > > > >> <offset,
> > > > > > >> > > > > metadata,
> > > > > > >> > > > > > leader epoch, partition epoch>.
> *consumer*.*seek*(...)
> > > > would
> > > > > > >> also
> > > > > > >> > > need
> > > > > > >> > > > > > leader_epoch and partition_epoch as parameter.
> > > Technically
> > > > > we
> > > > > > >> can
> > > > > > >> > > > > probably
> > > > > > >> > > > > > still make it work in a backward compatible manner
> > after
> > > > > > careful
> > > > > > >> > > design
> > > > > > >> > > > > and
> > > > > > >> > > > > > discussion. But these changes can make the
> consumer's
> > > > > > interface
> > > > > > >> > > > > > unnecessarily complex for more users who do not
> store
> > > > offset
> > > > > > >> > > > externally.
> > > > > > >> > > > > >
> > > > > > >> > > > > > After thinking more about it, we can address all
> > > problems
> > > > > > >> discussed
> > > > > > >> > > by
> > > > > > >> > > > > only
> > > > > > >> > > > > > using the metadata_epoch without introducing
> > > leader_epoch
> > > > or
> > > > > > the
> > > > > > >> > > > > > partition_epoch. The current KIP describes the
> changes
> > > to
> > > > > the
> > > > > > >> > > consumer
> > > > > > >> > > > > API
> > > > > > >> > > > > > and how the new API can be used if user stores
> offset
> > > > > > >> externally.
> > > > > > >> > In
> > > > > > >> > > > > order
> > > > > > >> > > > > > to address the scenario you described earlier, we
> can
> > > > > include
> > > > > > >> > > > > > metadata_epoch in the FetchResponse and the
> > > > > > LeaderAndIsrRequest.
> > > > > > >> > > > Consumer
> > > > > > >> > > > > > remembers the largest metadata_epoch from all the
> > > > > > FetchResponse
> > > > > > >> it
> > > > > > >> > > has
> > > > > > >> > > > > > received. The metadata_epoch committed with the
> > offset,
> > > > > either
> > > > > > >> > within
> > > > > > >> > > > or
> > > > > > >> > > > > > outside Kafka, should be the largest metadata_epoch
> > > across
> > > > > all
> > > > > > >> > > > > > FetchResponse and MetadataResponse ever received by
> > this
> > > > > > >> consumer.
> > > > > > >> > > > > >
> > > > > > >> > > > > > The drawback of using only the metadata_epoch is
> that
> > we
> > > > can
> > > > > > not
> > > > > > >> > > always
> > > > > > >> > > > > do
> > > > > > >> > > > > > the smart offset reset in case of unclean leader
> > > election
> > > > > > which
> > > > > > >> you
> > > > > > >> > > > > > mentioned earlier. But in most case, unclean leader
> > > > election
> > > > > > >> > probably
> > > > > > >> > > > > > happens when consumer is not rebalancing/restarting.
> > In
> > > > > these
> > > > > > >> > cases,
> > > > > > >> > > > > either
> > > > > > >> > > > > > consumer is not directly affected by unclean leader
> > > > election
> > > > > > >> since
> > > > > > >> > it
> > > > > > >> > > > is
> > > > > > >> > > > > > not consuming from the end of the log, or consumer
> can
> > > > > derive
> > > > > > >> the
> > > > > > >> > > > > > leader_epoch from the most recent message received
> > > before
> > > > it
> > > > > > >> sees
> > > > > > >> > > > > > OffsetOutOfRangeException. So I am not sure it is
> > worth
> > > > > adding
> > > > > > >> the
> > > > > > >> > > > > > leader_epoch to consumer API to address the
> remaining
> > > > corner
> > > > > > >> case.
> > > > > > >> > > What
> > > > > > >> > > > > do
> > > > > > >> > > > > > you think?
> > > > > > >> > > > > >
> > > > > > >> > > > > > Thanks,
> > > > > > >> > > > > > Dong
> > > > > > >> > > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun Rao <
> > > j...@confluent.io
> > > > >
> > > > > > >> wrote:
> > > > > > >> > > > > >
> > > > > > >> > > > > > > Hi, Dong,
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Thanks for the reply.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > To solve the topic recreation issue, we could use
> > > > either a
> > > > > > >> global
> > > > > > >> > > > > > metadata
> > > > > > >> > > > > > > version or a partition level epoch. But either one
> > > will
> > > > > be a
> > > > > > >> new
> > > > > > >> > > > > concept,
> > > > > > >> > > > > > > right? To me, the latter seems more natural. It
> also
> > > > makes
> > > > > > it
> > > > > > >> > > easier
> > > > > > >> > > > to
> > > > > > >> > > > > > > detect if a consumer's offset is still valid
> after a
> > > > topic
> > > > > > is
> > > > > > >> > > > > recreated.
> > > > > > >> > > > > > As
> > > > > > >> > > > > > > you pointed out, we don't need to store the
> > partition
> > > > > epoch
> > > > > > in
> > > > > > >> > the
> > > > > > >> > > > > > message.
> > > > > > >> > > > > > > The following is what I am thinking. When a
> > partition
> > > is
> > > > > > >> created,
> > > > > > >> > > we
> > > > > > >> > > > > can
> > > > > > >> > > > > > > assign a partition epoch from an ever-increasing
> > > global
> > > > > > >> counter
> > > > > > >> > and
> > > > > > >> > > > > store
> > > > > > >> > > > > > > it in /brokers/topics/[topic]/
> > > partitions/[partitionId]
> > > > in
> > > > > > ZK.
> > > > > > >> > The
> > > > > > >> > > > > > > partition
> > > > > > >> > > > > > > epoch is propagated to every broker. The consumer
> > will
> > > > be
> > > > > > >> > tracking
> > > > > > >> > > a
> > > > > > >> > > > > > tuple
> > > > > > >> > > > > > > of <offset, leader epoch, partition epoch> for
> > > offsets.
> > > > > If a
> > > > > > >> > topic
> > > > > > >> > > is
> > > > > > >> > > > > > > recreated, it's possible that a consumer's offset
> > and
> > > > > leader
> > > > > > >> > epoch
> > > > > > >> > > > > still
> > > > > > >> > > > > > > match that in the broker, but partition epoch
> won't
> > > be.
> > > > In
> > > > > > >> this
> > > > > > >> > > case,
> > > > > > >> > > > > we
> > > > > > >> > > > > > > can potentially still treat the consumer's offset
> as
> > > out
> > > > > of
> > > > > > >> range
> > > > > > >> > > and
> > > > > > >> > > > > > reset
> > > > > > >> > > > > > > the offset based on the offset reset policy in the
> > > > > consumer.
> > > > > > >> This
> > > > > > >> > > > seems
> > > > > > >> > > > > > > harder to do with a global metadata version.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Jun
> > > > > > >> > > > > > >
> > > > > > >> > > > > > >
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > On Mon, Dec 25, 2017 at 6:56 AM, Dong Lin <
> > > > > > >> lindon...@gmail.com>
> > > > > > >> > > > wrote:
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > > Hey Jun,
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > This is a very good example. After thinking
> > through
> > > > this
> > > > > > in
> > > > > > >> > > > detail, I
> > > > > > >> > > > > > > agree
> > > > > > >> > > > > > > > that we need to commit offset with leader epoch
> in
> > > > order
> > > > > > to
> > > > > > >> > > address
> > > > > > >> > > > > > this
> > > > > > >> > > > > > > > example.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > I think the remaining question is how to address
> > the
> > > > > > >> scenario
> > > > > > >> > > that
> > > > > > >> > > > > the
> > > > > > >> > > > > > > > topic is deleted and re-created. One possible
> > > solution
> > > > > is
> > > > > > to
> > > > > > >> > > commit
> > > > > > >> > > > > > > offset
> > > > > > >> > > > > > > > with both the leader epoch and the metadata
> > version.
> > > > The
> > > > > > >> logic
> > > > > > >> > > and
> > > > > > >> > > > > the
> > > > > > >> > > > > > > > implementation of this solution does not
> require a
> > > new
> > > > > > >> concept
> > > > > > >> > > > (e.g.
> > > > > > >> > > > > > > > partition epoch) and it does not require any
> > change
> > > to
> > > > > the
> > > > > > >> > > message
> > > > > > >> > > > > > format
> > > > > > >> > > > > > > > or leader epoch. It also allows us to order the
> > > > metadata
> > > > > > in
> > > > > > >> a
> > > > > > >> > > > > > > > straightforward manner which may be useful in
> the
> > > > > future.
> > > > > > >> So it
> > > > > > >> > > may
> > > > > > >> > > > > be
> > > > > > >> > > > > > a
> > > > > > >> > > > > > > > better solution than generating a random
> partition
> > > > epoch
> > > > > > >> every
> > > > > > >> > > time
> > > > > > >> > > > > we
> > > > > > >> > > > > > > > create a partition. Does this sound reasonable?
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > Previously one concern with using the metadata
> > > version
> > > > > is
> > > > > > >> that
> > > > > > >> > > > > consumer
> > > > > > >> > > > > > > > will be forced to refresh metadata even if
> > metadata
> > > > > > version
> > > > > > >> is
> > > > > > >> > > > > > increased
> > > > > > >> > > > > > > > due to topics that the consumer is not
> interested
> > > in.
> > > > > Now
> > > > > > I
> > > > > > >> > > > realized
> > > > > > >> > > > > > that
> > > > > > >> > > > > > > > this is probably not a problem. Currently client
> > > will
> > > > > > >> refresh
> > > > > > >> > > > > metadata
> > > > > > >> > > > > > > > either due to InvalidMetadataException in the
> > > response
> > > > > > from
> > > > > > >> > > broker
> > > > > > >> > > > or
> > > > > > >> > > > > > due
> > > > > > >> > > > > > > > to metadata expiry. The addition of the metadata
> > > > version
> > > > > > >> should
> > > > > > >> > > > > > increase
> > > > > > >> > > > > > > > the overhead of metadata refresh caused by
> > > > > > >> > > > InvalidMetadataException.
> > > > > > >> > > > > If
> > > > > > >> > > > > > > > client refresh metadata due to expiry and it
> > > receives
> > > > a
> > > > > > >> > metadata
> > > > > > >> > > > > whose
> > > > > > >> > > > > > > > version is lower than the current metadata
> > version,
> > > we
> > > > > can
> > > > > > >> > reject
> > > > > > >> > > > the
> > > > > > >> > > > > > > > metadata but still reset the metadata age, which
> > > > > > essentially
> > > > > > >> > keep
> > > > > > >> > > > the
> > > > > > >> > > > > > > > existing behavior in the client.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > Thanks much,
> > > > > > >> > > > > > > > Dong
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to