Hi, Dong,

My replies are the following.

60. What you described could also work. The drawback is that we will be
unnecessarily changing the partition epoch when a partition hasn't really
changed. I was imagining that the partition epoch will be stored in
/brokers/topics/[topic]/partitions/[partitionId], instead of at the topic
level. So, not sure if ZK size limit is an issue.

61, 62 and 65. To me, the offset + offset_epoch is a unique identifier for
a message. So, if a message hasn't changed, the offset and the associated
offset_epoch ideally should remain the same (it will be kind of weird if
two consumer apps save the offset on the same message, but the offset_epoch
are different). partition_epoch + leader_epoch give us that. global_epoch +
leader_epoch don't. If we use this approach, we can solve not only the
problem that you have identified, but also other problems when there is
data loss or topic re-creation more reliably. For example, in the future,
if we include the partition_epoch and leader_epoch in the fetch request,
the server can do a more reliable check of whether that offset is valid or
not. I am not sure that we can rely upon all external offsets to be removed
on topic deletion. For example, a topic may be deleted by an admin who may
not know all the applications.

If we agree on the above, the second question is then how to reliably
propagate the partition_epoch and the leader_epoch to the consumer when
there are leader or partition changes. The leader_epoch comes from the
message, which is reliable. So, I was suggesting that when we store an
offset, we can just store the leader_epoch from the message set containing
that offset. Similarly, I was thinking that if the partition_epoch is in
the fetch response, we can propagate partition_epoch reliably where is
partition_epoch change.

63. My point is that once a leader is producing a message in the new
partition_epoch, ideally, we should associate the new offsets with the new
partition_epoch. Otherwise, the offset_epoch won't be the correct unique
identifier (useful for solving other problems mentioned above). I was
originally thinking that the leader will include the partition_epoch in the
metadata cache in the fetch response. It's just that right now, metadata
cache is updated on UpdateMetadataRequest, which typically happens after
the LeaderAndIsrRequest. Another approach is for the leader to cache the
partition_epoch in the Partition object and return that (instead of the one
in metadata cache) in the fetch response.

65. It seems to me that the global_epoch and the partition_epoch have
different purposes. A partition_epoch has the benefit that it (1) can be
used to form a unique identifier for a message and (2) can be used to
solve other
corner case problems in the future. I am not sure having just a
global_epoch can achieve these. global_epoch is useful to determine which
version of the metadata is newer, especially with topic deletion.

Thanks,

Jun

On Tue, Jan 9, 2018 at 11:34 PM, Dong Lin <lindon...@gmail.com> wrote:

> Regarding the use of the global epoch in 65), it is very similar to the
> proposal of the metadata_epoch we discussed earlier. The main difference is
> that this epoch is incremented when we create/expand/delete topic and does
> not change when controller re-send metadata.
>
> I looked at our previous discussion. It seems that we prefer
> partition_epoch over the metadata_epoch because 1) we prefer not to have an
> ever growing metadata_epoch and 2) we can reset offset better when topic is
> re-created. The use of global topic_epoch avoids the drawback of an ever
> quickly ever growing metadata_epoch. Though the global epoch does not allow
> us to recognize the invalid offset committed before the topic re-creation,
> we can probably just delete the offset when we delete a topic. Thus I am
> not very sure whether it is still worthwhile to have a per-partition
> partition_epoch if the metadata already has the global epoch.
>
>
> On Tue, Jan 9, 2018 at 6:58 PM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Jun,
> >
> > Thanks so much. These comments very useful. Please see below my comments.
> >
> > On Mon, Jan 8, 2018 at 5:52 PM, Jun Rao <j...@confluent.io> wrote:
> >
> >> Hi, Dong,
> >>
> >> Thanks for the updated KIP. A few more comments.
> >>
> >> 60. Perhaps having a partition epoch is more flexible since in the
> future,
> >> we may support deleting a partition as well.
> >>
> >
> > Yeah I have considered this. I think we can probably still support
> > deleting a partition by using the topic_epoch -- when partition of a
> topic
> > is deleted or created, epoch of all partitions of this topic will be
> > incremented by 1. Therefore, if that partition is re-created later, the
> > epoch of that partition will still be larger than its epoch before the
> > deletion, which still allows the client to order the metadata for the
> > purpose of this KIP. Does this sound reasonable?
> >
> > The advantage of using topic_epoch instead of partition_epoch is that the
> > size of the /brokers/topics/[topic] znode and request/response size can
> be
> > smaller. We have a limit on the maximum size of znode (typically 1MB).
> Use
> > partition epoch can effectively reduce the number of partitions that can
> be
> > described by the /brokers/topics/[topic] znode.
> >
> > One use-case of partition_epoch for client to detect that the committed
> > offset, either from kafka offset topic or from the external store is
> > invalid after partition deletion and re-creation. However, it seems that
> we
> > can also address this use-case with other approaches. For example, when
> > AdminClient deletes partitions, it can also delete the committed offsets
> > for those partitions from the offset topic. If user stores offset
> > externally, it might make sense for user to similarly remove offsets of
> > related partitions after these partitions are deleted. So I am not sure
> > that we should use partition_epoch in this KIP.
> >
> >
> >>
> >> 61. It seems that the leader epoch returned in the position() call
> should
> >> the the leader epoch returned in the fetch response, not the one in the
> >> metadata cache of the client.
> >
> >
> > I think this is a good idea. Just to double check, this change does not
> > affect the correctness or performance of this KIP. But it can be useful
> if
> > we want to use the leader_epoch to better handle the offset rest in case
> of
> > unclean leader election, which is listed in the future work. Is this
> > understanding correct?
> >
> > I have updated the KIP to specify that the leader_epoch returned by
> > position() should be the largest leader_epoch of those already consumed
> > messages whose offset < position. If no message has been consumed since
> > consumer initialization, the leader_epoch from seek() or
> > OffsetFetchResponse should be used. The offset included in the
> > OffsetCommitRequest will also be determined in the similar manner.
> >
> >
> >>
> >> 62. I am wondering if we should return the partition epoch in the fetch
> >> response as well. In the current proposal, if a topic is recreated and
> the
> >> new leader is on the same broker as the old one, there is nothing to
> force
> >> the metadata refresh in the client. So, the client may still associate
> the
> >> offset with the old partition epoch.
> >>
> >
> > Could you help me understand the problem if a client associates old
> > partition_epoch (or the topic_epoch as of the current KIP) with the
> offset?
> > The main purpose of the topic_epoch is to be able to drop leader_epoch
> to 0
> > after a partition is deleted and re-created. I guess you may be thinking
> > about using the partition_epoch to detect that the committed offset is
> > invalid? In that case, I am wondering if the alternative approach
> described
> > in 60) would be reasonable.
> >
> >
> >>
> >> 63. There is some subtle coordination between the LeaderAndIsrRequest
> and
> >> UpdateMetadataRequest. Currently, when a leader changes, the controller
> >> first sends the LeaderAndIsrRequest to the assigned replicas and the
> >> UpdateMetadataRequest to every broker. So, there could be a small window
> >> when the leader already receives the new partition epoch in the
> >> LeaderAndIsrRequest, but the metadata cache in the broker hasn't been
> >> updated with the latest partition epoch. Not sure what's the best way to
> >> address this issue. Perhaps we can update the metadata cache on the
> broker
> >> with both LeaderAndIsrRequest and UpdateMetadataRequest. The challenge
> is
> >> that the two have slightly different data. For example, only the latter
> >> has
> >> all endpoints.
> >>
> >
> > I am not sure whether this is a problem. Could you explain a bit more
> what
> > specific problem this small window can cause?
> >
> > Since client can fetch metadata from any broker in the cluster, and given
> > that different brokers receive request (e.g. LeaderAndIsrRequest and
> > UpdateMetadataRequest) in arbitrary order, the metadata received by
> client
> > can be in arbitrary order (either newer or older) compared to the
> broker's
> > leadership state even if a given broker receives LeaderAndIsrRequest and
> > UpdateMetadataRequest simultaneously. So I am not sure it is useful to
> > update broker's cache with LeaderAndIsrRequest.
> >
> >
> >> 64. The enforcement of leader epoch in Offset commit: We allow a
> consumer
> >> to set an arbitrary offset. So it's possible for offsets or leader epoch
> >> to
> >> go backwards. I am not sure if we could always enforce that the leader
> >> epoch only goes up on the broker.
> >>
> >
> > Sure. I have removed this check from the KIP.
> >
> > BTW, we can probably still ensure that the leader_epoch always increase
> if
> > the leader_epoch used with offset commit is the max(leader_epoch of the
> > message with offset = the committed offset - 1, the largest known
> > leader_epoch from the metadata). But I don't have a good use-case for
> this
> > alternative definition. So I choose the keep the KIP simple by requiring
> > leader_epoch to always increase.
> >
> >
> >> 65. Good point on handling missing partition epoch due to topic
> deletion.
> >> Another potential way to address this is to additionally propagate the
> >> global partition epoch to brokers and the clients. This way, when a
> >> partition epoch is missing, we can use the global partition epoch to
> >> reason
> >> about which metadata is more recent.
> >>
> >
> > This is a great idea. The global epoch can be used to order the metadata
> > and help us recognize the more recent metadata if a topic (or partition)
> is
> > deleted and re-created.
> >
> > Actually, it seems we only need to propagate the global epoch to brokers
> > and clients without propagating this epoch on a per-topic or
> per-partition
> > basic. Doing so would simply interface changes made this KIP. Does this
> > approach sound reasonable?
> >
> >
> >> 66. A client may also get an offset by time using the offsetForTimes()
> >> api.
> >> So, we probably want to include offsetInternalMetadata in
> >> OffsetAndTimestamp
> >> as well.
> >>
> >
> > You are right. This probably also requires us to change the
> > ListOffsetRequest as well. I will update the KIP after we agree on the
> > solution for 65).
> >
> >
> >>
> >> 67. InteralMetadata can be a bit confusing with the metadata field
> already
> >> there. Perhaps we can just call it OffsetEpoch. It might be useful to
> make
> >> OffsetEpoch printable at least for debugging purpose. Once you do that,
> we
> >> are already exposing the internal fields. So, not sure if it's worth
> >> hiding
> >> them. If we do want to hide them, perhaps we can have sth like the
> >> following. The binary encoding is probably more efficient than JSON for
> >> external storage.
> >>
> >> OffsetEpoch {
> >>  static OffsetEpoch decode(byte[]);
> >>
> >>   public byte[] encode();
> >>
> >>   public String toString();
> >> }
> >>
> >
> > Thanks much. I like this solution. I have updated the KIP accordingly.
> >
> >
> >
> >>
> >> Jun
> >>
> >> On Mon, Jan 8, 2018 at 4:22 PM, Dong Lin <lindon...@gmail.com> wrote:
> >>
> >> > Hey Jason,
> >> >
> >> > Certainly. This sounds good. I have updated the KIP to clarity that
> the
> >> > global epoch will be incremented by 1 each time a topic is deleted.
> >> >
> >> > Thanks,
> >> > Dong
> >> >
> >> > On Mon, Jan 8, 2018 at 4:09 PM, Jason Gustafson <ja...@confluent.io>
> >> > wrote:
> >> >
> >> > > Hi Dong,
> >> > >
> >> > >
> >> > > I think your approach will allow user to distinguish between the
> >> metadata
> >> > > > before and after the topic deletion. I also agree that this can be
> >> > > > potentially be useful to user. I am just not very sure whether we
> >> > already
> >> > > > have a good use-case to make the additional complexity worthwhile.
> >> It
> >> > > seems
> >> > > > that this feature is kind of independent of the main problem of
> this
> >> > KIP.
> >> > > > Could we add this as a future work?
> >> > >
> >> > >
> >> > > Do you think it's fair if we bump the topic epoch on deletion and
> >> leave
> >> > > propagation of the epoch for deleted topics for future work? I don't
> >> > think
> >> > > this adds much complexity and it makes the behavior consistent:
> every
> >> > topic
> >> > > mutation results in an epoch bump.
> >> > >
> >> > > Thanks,
> >> > > Jason
> >> > >
> >> > > On Mon, Jan 8, 2018 at 3:14 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> >> > >
> >> > > > Hey Ismael,
> >> > > >
> >> > > > I guess we actually need user to see this field so that user can
> >> store
> >> > > this
> >> > > > value in the external store together with the offset. We just
> prefer
> >> > the
> >> > > > value to be opaque to discourage most users from interpreting this
> >> > value.
> >> > > > One more advantage of using such an opaque field is to be able to
> >> > evolve
> >> > > > the information (or schema) of this value without changing
> consumer
> >> API
> >> > > in
> >> > > > the future.
> >> > > >
> >> > > > I also thinking it is probably OK for user to be able to interpret
> >> this
> >> > > > value, particularly for those advanced users.
> >> > > >
> >> > > > Thanks,
> >> > > > Dong
> >> > > >
> >> > > > On Mon, Jan 8, 2018 at 2:34 PM, Ismael Juma <ism...@juma.me.uk>
> >> wrote:
> >> > > >
> >> > > > > On Fri, Jan 5, 2018 at 7:15 PM, Jason Gustafson <
> >> ja...@confluent.io>
> >> > > > > wrote:
> >> > > > > >
> >> > > > > > class OffsetAndMetadata {
> >> > > > > >   long offset;
> >> > > > > >   byte[] offsetMetadata;
> >> > > > > >   String metadata;
> >> > > > > > }
> >> > > > >
> >> > > > >
> >> > > > > > Admittedly, the naming is a bit annoying, but we can probably
> >> come
> >> > up
> >> > > > > with
> >> > > > > > something better. Internally the byte array would have a
> >> version.
> >> > If
> >> > > in
> >> > > > > the
> >> > > > > > future we have anything else we need to add, we can update the
> >> > > version
> >> > > > > and
> >> > > > > > we wouldn't need any new APIs.
> >> > > > > >
> >> > > > >
> >> > > > > We can also add fields to a class in a compatible way. So, it
> >> seems
> >> > to
> >> > > me
> >> > > > > that the main advantage of the byte array is that it's opaque to
> >> the
> >> > > > user.
> >> > > > > Is that correct? If so, we could also add any opaque metadata
> in a
> >> > > > subclass
> >> > > > > so that users don't even see it (unless they cast it, but then
> >> > they're
> >> > > on
> >> > > > > their own).
> >> > > > >
> >> > > > > Ismael
> >> > > > >
> >> > > > > The corresponding seek() and position() APIs might look
> something
> >> > like
> >> > > > > this:
> >> > > > > >
> >> > > > > > void seek(TopicPartition partition, long offset, byte[]
> >> > > > offsetMetadata);
> >> > > > > > byte[] positionMetadata(TopicPartition partition);
> >> > > > > >
> >> > > > > > What do you think?
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Jason
> >> > > > > >
> >> > > > > > On Thu, Jan 4, 2018 at 7:04 PM, Dong Lin <lindon...@gmail.com
> >
> >> > > wrote:
> >> > > > > >
> >> > > > > > > Hey Jun, Jason,
> >> > > > > > >
> >> > > > > > > Thanks much for all the feedback. I have updated the KIP
> >> based on
> >> > > the
> >> > > > > > > latest discussion. Can you help check whether it looks good?
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > > Dong
> >> > > > > > >
> >> > > > > > > On Thu, Jan 4, 2018 at 5:36 PM, Dong Lin <
> lindon...@gmail.com
> >> >
> >> > > > wrote:
> >> > > > > > >
> >> > > > > > > > Hey Jun,
> >> > > > > > > >
> >> > > > > > > > Hmm... thinking about this more, I am not sure that the
> >> > proposed
> >> > > > API
> >> > > > > is
> >> > > > > > > > sufficient. For users that store offset externally, we
> >> probably
> >> > > > need
> >> > > > > > > extra
> >> > > > > > > > API to return the leader_epoch and partition_epoch for all
> >> > > > partitions
> >> > > > > > > that
> >> > > > > > > > consumers are consuming. I suppose these users currently
> use
> >> > > > > position()
> >> > > > > > > to
> >> > > > > > > > get the offset. Thus we probably need a new method
> >> > > > > > positionWithEpoch(..)
> >> > > > > > > to
> >> > > > > > > > return <offset, partition_epoch, leader_epoch>. Does this
> >> sound
> >> > > > > > > reasonable?
> >> > > > > > > >
> >> > > > > > > > Thanks,
> >> > > > > > > > Dong
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > On Thu, Jan 4, 2018 at 5:26 PM, Jun Rao <j...@confluent.io
> >
> >> > > wrote:
> >> > > > > > > >
> >> > > > > > > >> Hi, Dong,
> >> > > > > > > >>
> >> > > > > > > >> Yes, that's what I am thinking. OffsetEpoch will be
> >> composed
> >> > of
> >> > > > > > > >> (partition_epoch,
> >> > > > > > > >> leader_epoch).
> >> > > > > > > >>
> >> > > > > > > >> Thanks,
> >> > > > > > > >>
> >> > > > > > > >> Jun
> >> > > > > > > >>
> >> > > > > > > >>
> >> > > > > > > >> On Thu, Jan 4, 2018 at 4:22 PM, Dong Lin <
> >> lindon...@gmail.com
> >> > >
> >> > > > > wrote:
> >> > > > > > > >>
> >> > > > > > > >> > Hey Jun,
> >> > > > > > > >> >
> >> > > > > > > >> > Thanks much. I like the the new API that you proposed.
> I
> >> am
> >> > > not
> >> > > > > sure
> >> > > > > > > >> what
> >> > > > > > > >> > you exactly mean by offset_epoch. I suppose that we can
> >> use
> >> > > the
> >> > > > > pair
> >> > > > > > > of
> >> > > > > > > >> > (partition_epoch, leader_epoch) as the offset_epoch,
> >> right?
> >> > > > > > > >> >
> >> > > > > > > >> > Thanks,
> >> > > > > > > >> > Dong
> >> > > > > > > >> >
> >> > > > > > > >> > On Thu, Jan 4, 2018 at 4:02 PM, Jun Rao <
> >> j...@confluent.io>
> >> > > > wrote:
> >> > > > > > > >> >
> >> > > > > > > >> > > Hi, Dong,
> >> > > > > > > >> > >
> >> > > > > > > >> > > Got it. The api that you proposed works. The question
> >> is
> >> > > > whether
> >> > > > > > > >> that's
> >> > > > > > > >> > the
> >> > > > > > > >> > > api that we want to have in the long term. My concern
> >> is
> >> > > that
> >> > > > > > while
> >> > > > > > > >> the
> >> > > > > > > >> > api
> >> > > > > > > >> > > change is simple, the new api seems harder to explain
> >> and
> >> > > use.
> >> > > > > For
> >> > > > > > > >> > example,
> >> > > > > > > >> > > a consumer storing offsets externally now needs to
> call
> >> > > > > > > >> > > waitForMetadataUpdate() after calling seek().
> >> > > > > > > >> > >
> >> > > > > > > >> > > An alternative approach is to make the following
> >> > compatible
> >> > > > api
> >> > > > > > > >> changes
> >> > > > > > > >> > in
> >> > > > > > > >> > > Consumer.
> >> > > > > > > >> > > * Add an additional OffsetEpoch field in
> >> > OffsetAndMetadata.
> >> > > > (no
> >> > > > > > need
> >> > > > > > > >> to
> >> > > > > > > >> > > change the CommitSync() api)
> >> > > > > > > >> > > * Add a new api seek(TopicPartition partition, long
> >> > offset,
> >> > > > > > > >> OffsetEpoch
> >> > > > > > > >> > > offsetEpoch). We can potentially deprecate the old
> api
> >> > > > > > > >> > seek(TopicPartition
> >> > > > > > > >> > > partition, long offset) in the future.
> >> > > > > > > >> > >
> >> > > > > > > >> > > The alternative approach has similar amount of api
> >> changes
> >> > > as
> >> > > > > > yours
> >> > > > > > > >> but
> >> > > > > > > >> > has
> >> > > > > > > >> > > the following benefits.
> >> > > > > > > >> > > 1. The api works in a similar way as how offset
> >> management
> >> > > > works
> >> > > > > > now
> >> > > > > > > >> and
> >> > > > > > > >> > is
> >> > > > > > > >> > > probably what we want in the long term.
> >> > > > > > > >> > > 2. It can reset offsets better when there is data
> loss
> >> due
> >> > > to
> >> > > > > > > unclean
> >> > > > > > > >> > > leader election or correlated replica failure.
> >> > > > > > > >> > > 3. It can reset offsets better when topic is
> recreated.
> >> > > > > > > >> > >
> >> > > > > > > >> > > Thanks,
> >> > > > > > > >> > >
> >> > > > > > > >> > > Jun
> >> > > > > > > >> > >
> >> > > > > > > >> > >
> >> > > > > > > >> > > On Thu, Jan 4, 2018 at 2:05 PM, Dong Lin <
> >> > > lindon...@gmail.com
> >> > > > >
> >> > > > > > > wrote:
> >> > > > > > > >> > >
> >> > > > > > > >> > > > Hey Jun,
> >> > > > > > > >> > > >
> >> > > > > > > >> > > > Yeah I agree that ideally we don't want an ever
> >> growing
> >> > > > global
> >> > > > > > > >> metadata
> >> > > > > > > >> > > > version. I just think it may be more desirable to
> >> keep
> >> > the
> >> > > > > > > consumer
> >> > > > > > > >> API
> >> > > > > > > >> > > > simple.
> >> > > > > > > >> > > >
> >> > > > > > > >> > > > In my current proposal, metadata version returned
> in
> >> the
> >> > > > fetch
> >> > > > > > > >> response
> >> > > > > > > >> > > > will be stored with the offset together. More
> >> > > specifically,
> >> > > > > the
> >> > > > > > > >> > > > metadata_epoch in the new offset topic schema will
> be
> >> > the
> >> > > > > > largest
> >> > > > > > > >> > > > metadata_epoch from all the MetadataResponse and
> >> > > > FetchResponse
> >> > > > > > > ever
> >> > > > > > > >> > > > received by this consumer.
> >> > > > > > > >> > > >
> >> > > > > > > >> > > > We probably don't have to change the consumer API
> for
> >> > > > > > > >> > > > commitSync(Map<TopicPartition, OffsetAndMetadata>).
> >> If
> >> > > user
> >> > > > > > calls
> >> > > > > > > >> > > > commitSync(...) to commit offset 10 for a given
> >> > partition,
> >> > > > for
> >> > > > > > > most
> >> > > > > > > >> > > > use-cases, this consumer instance should have
> >> consumed
> >> > > > message
> >> > > > > > > with
> >> > > > > > > >> > > offset
> >> > > > > > > >> > > > 9 from this partition, in which case the consumer
> can
> >> > > > remember
> >> > > > > > and
> >> > > > > > > >> use
> >> > > > > > > >> > > the
> >> > > > > > > >> > > > metadata_epoch from the corresponding FetchResponse
> >> when
> >> > > > > > > committing
> >> > > > > > > >> > > offset.
> >> > > > > > > >> > > > If user calls commitSync(..) to commit offset 10
> for
> >> a
> >> > > given
> >> > > > > > > >> partition
> >> > > > > > > >> > > > without having consumed the message with offset 9
> >> using
> >> > > this
> >> > > > > > > >> consumer
> >> > > > > > > >> > > > instance, this is probably an advanced use-case. In
> >> this
> >> > > > case
> >> > > > > > the
> >> > > > > > > >> > > advanced
> >> > > > > > > >> > > > user can retrieve the metadata_epoch using the
> newly
> >> > added
> >> > > > > > > >> > > metadataEpoch()
> >> > > > > > > >> > > > API after it fetches the message with offset 9
> >> (probably
> >> > > > from
> >> > > > > > > >> another
> >> > > > > > > >> > > > consumer instance) and encode this metadata_epoch
> in
> >> the
> >> > > > > > > >> > > > string OffsetAndMetadata.metadata. Do you think
> this
> >> > > > solution
> >> > > > > > > would
> >> > > > > > > >> > work?
> >> > > > > > > >> > > >
> >> > > > > > > >> > > > By "not sure that I fully understand your latest
> >> > > > suggestion",
> >> > > > > > are
> >> > > > > > > >> you
> >> > > > > > > >> > > > referring to solution related to unclean leader
> >> election
> >> > > > using
> >> > > > > > > >> > > leader_epoch
> >> > > > > > > >> > > > in my previous email?
> >> > > > > > > >> > > >
> >> > > > > > > >> > > > Thanks,
> >> > > > > > > >> > > > Dong
> >> > > > > > > >> > > >
> >> > > > > > > >> > > > On Thu, Jan 4, 2018 at 1:33 PM, Jun Rao <
> >> > j...@confluent.io
> >> > > >
> >> > > > > > wrote:
> >> > > > > > > >> > > >
> >> > > > > > > >> > > > > Hi, Dong,
> >> > > > > > > >> > > > >
> >> > > > > > > >> > > > > Not sure that I fully understand your latest
> >> > suggestion.
> >> > > > > > > >> Returning an
> >> > > > > > > >> > > > ever
> >> > > > > > > >> > > > > growing global metadata version itself is no
> ideal,
> >> > but
> >> > > is
> >> > > > > > fine.
> >> > > > > > > >> My
> >> > > > > > > >> > > > > question is whether the metadata version returned
> >> in
> >> > the
> >> > > > > fetch
> >> > > > > > > >> > response
> >> > > > > > > >> > > > > needs to be stored with the offset together if
> >> offsets
> >> > > are
> >> > > > > > > stored
> >> > > > > > > >> > > > > externally. If so, we also have to change the
> >> consumer
> >> > > API
> >> > > > > for
> >> > > > > > > >> > > > commitSync()
> >> > > > > > > >> > > > > and need to worry about compatibility. If we
> don't
> >> > store
> >> > > > the
> >> > > > > > > >> metadata
> >> > > > > > > >> > > > > version together with the offset, on a consumer
> >> > restart,
> >> > > > > it's
> >> > > > > > > not
> >> > > > > > > >> > clear
> >> > > > > > > >> > > > how
> >> > > > > > > >> > > > > we can ensure the metadata in the consumer is
> high
> >> > > enough
> >> > > > > > since
> >> > > > > > > >> there
> >> > > > > > > >> > > is
> >> > > > > > > >> > > > no
> >> > > > > > > >> > > > > metadata version to compare with.
> >> > > > > > > >> > > > >
> >> > > > > > > >> > > > > Thanks,
> >> > > > > > > >> > > > >
> >> > > > > > > >> > > > > Jun
> >> > > > > > > >> > > > >
> >> > > > > > > >> > > > >
> >> > > > > > > >> > > > > On Wed, Jan 3, 2018 at 6:43 PM, Dong Lin <
> >> > > > > lindon...@gmail.com
> >> > > > > > >
> >> > > > > > > >> > wrote:
> >> > > > > > > >> > > > >
> >> > > > > > > >> > > > > > Hey Jun,
> >> > > > > > > >> > > > > >
> >> > > > > > > >> > > > > > Thanks much for the explanation.
> >> > > > > > > >> > > > > >
> >> > > > > > > >> > > > > > I understand the advantage of partition_epoch
> >> over
> >> > > > > > > >> metadata_epoch.
> >> > > > > > > >> > My
> >> > > > > > > >> > > > > > current concern is that the use of leader_epoch
> >> and
> >> > > the
> >> > > > > > > >> > > partition_epoch
> >> > > > > > > >> > > > > > requires us considerable change to consumer's
> >> public
> >> > > API
> >> > > > > to
> >> > > > > > > take
> >> > > > > > > >> > care
> >> > > > > > > >> > > > of
> >> > > > > > > >> > > > > > the case where user stores offset externally.
> For
> >> > > > example,
> >> > > > > > > >> > > *consumer*.
> >> > > > > > > >> > > > > > *commitSync*(..) would have to take a map whose
> >> > value
> >> > > is
> >> > > > > > > >> <offset,
> >> > > > > > > >> > > > > metadata,
> >> > > > > > > >> > > > > > leader epoch, partition epoch>.
> >> > *consumer*.*seek*(...)
> >> > > > > would
> >> > > > > > > >> also
> >> > > > > > > >> > > need
> >> > > > > > > >> > > > > > leader_epoch and partition_epoch as parameter.
> >> > > > Technically
> >> > > > > > we
> >> > > > > > > >> can
> >> > > > > > > >> > > > > probably
> >> > > > > > > >> > > > > > still make it work in a backward compatible
> >> manner
> >> > > after
> >> > > > > > > careful
> >> > > > > > > >> > > design
> >> > > > > > > >> > > > > and
> >> > > > > > > >> > > > > > discussion. But these changes can make the
> >> > consumer's
> >> > > > > > > interface
> >> > > > > > > >> > > > > > unnecessarily complex for more users who do not
> >> > store
> >> > > > > offset
> >> > > > > > > >> > > > externally.
> >> > > > > > > >> > > > > >
> >> > > > > > > >> > > > > > After thinking more about it, we can address
> all
> >> > > > problems
> >> > > > > > > >> discussed
> >> > > > > > > >> > > by
> >> > > > > > > >> > > > > only
> >> > > > > > > >> > > > > > using the metadata_epoch without introducing
> >> > > > leader_epoch
> >> > > > > or
> >> > > > > > > the
> >> > > > > > > >> > > > > > partition_epoch. The current KIP describes the
> >> > changes
> >> > > > to
> >> > > > > > the
> >> > > > > > > >> > > consumer
> >> > > > > > > >> > > > > API
> >> > > > > > > >> > > > > > and how the new API can be used if user stores
> >> > offset
> >> > > > > > > >> externally.
> >> > > > > > > >> > In
> >> > > > > > > >> > > > > order
> >> > > > > > > >> > > > > > to address the scenario you described earlier,
> we
> >> > can
> >> > > > > > include
> >> > > > > > > >> > > > > > metadata_epoch in the FetchResponse and the
> >> > > > > > > LeaderAndIsrRequest.
> >> > > > > > > >> > > > Consumer
> >> > > > > > > >> > > > > > remembers the largest metadata_epoch from all
> the
> >> > > > > > > FetchResponse
> >> > > > > > > >> it
> >> > > > > > > >> > > has
> >> > > > > > > >> > > > > > received. The metadata_epoch committed with the
> >> > > offset,
> >> > > > > > either
> >> > > > > > > >> > within
> >> > > > > > > >> > > > or
> >> > > > > > > >> > > > > > outside Kafka, should be the largest
> >> metadata_epoch
> >> > > > across
> >> > > > > > all
> >> > > > > > > >> > > > > > FetchResponse and MetadataResponse ever
> received
> >> by
> >> > > this
> >> > > > > > > >> consumer.
> >> > > > > > > >> > > > > >
> >> > > > > > > >> > > > > > The drawback of using only the metadata_epoch
> is
> >> > that
> >> > > we
> >> > > > > can
> >> > > > > > > not
> >> > > > > > > >> > > always
> >> > > > > > > >> > > > > do
> >> > > > > > > >> > > > > > the smart offset reset in case of unclean
> leader
> >> > > > election
> >> > > > > > > which
> >> > > > > > > >> you
> >> > > > > > > >> > > > > > mentioned earlier. But in most case, unclean
> >> leader
> >> > > > > election
> >> > > > > > > >> > probably
> >> > > > > > > >> > > > > > happens when consumer is not
> >> rebalancing/restarting.
> >> > > In
> >> > > > > > these
> >> > > > > > > >> > cases,
> >> > > > > > > >> > > > > either
> >> > > > > > > >> > > > > > consumer is not directly affected by unclean
> >> leader
> >> > > > > election
> >> > > > > > > >> since
> >> > > > > > > >> > it
> >> > > > > > > >> > > > is
> >> > > > > > > >> > > > > > not consuming from the end of the log, or
> >> consumer
> >> > can
> >> > > > > > derive
> >> > > > > > > >> the
> >> > > > > > > >> > > > > > leader_epoch from the most recent message
> >> received
> >> > > > before
> >> > > > > it
> >> > > > > > > >> sees
> >> > > > > > > >> > > > > > OffsetOutOfRangeException. So I am not sure it
> is
> >> > > worth
> >> > > > > > adding
> >> > > > > > > >> the
> >> > > > > > > >> > > > > > leader_epoch to consumer API to address the
> >> > remaining
> >> > > > > corner
> >> > > > > > > >> case.
> >> > > > > > > >> > > What
> >> > > > > > > >> > > > > do
> >> > > > > > > >> > > > > > you think?
> >> > > > > > > >> > > > > >
> >> > > > > > > >> > > > > > Thanks,
> >> > > > > > > >> > > > > > Dong
> >> > > > > > > >> > > > > >
> >> > > > > > > >> > > > > >
> >> > > > > > > >> > > > > >
> >> > > > > > > >> > > > > > On Tue, Jan 2, 2018 at 6:28 PM, Jun Rao <
> >> > > > j...@confluent.io
> >> > > > > >
> >> > > > > > > >> wrote:
> >> > > > > > > >> > > > > >
> >> > > > > > > >> > > > > > > Hi, Dong,
> >> > > > > > > >> > > > > > >
> >> > > > > > > >> > > > > > > Thanks for the reply.
> >> > > > > > > >> > > > > > >
> >> > > > > > > >> > > > > > > To solve the topic recreation issue, we could
> >> use
> >> > > > > either a
> >> > > > > > > >> global
> >> > > > > > > >> > > > > > metadata
> >> > > > > > > >> > > > > > > version or a partition level epoch. But
> either
> >> one
> >> > > > will
> >> > > > > > be a
> >> > > > > > > >> new
> >> > > > > > > >> > > > > concept,
> >> > > > > > > >> > > > > > > right? To me, the latter seems more natural.
> It
> >> > also
> >> > > > > makes
> >> > > > > > > it
> >> > > > > > > >> > > easier
> >> > > > > > > >> > > > to
> >> > > > > > > >> > > > > > > detect if a consumer's offset is still valid
> >> > after a
> >> > > > > topic
> >> > > > > > > is
> >> > > > > > > >> > > > > recreated.
> >> > > > > > > >> > > > > > As
> >> > > > > > > >> > > > > > > you pointed out, we don't need to store the
> >> > > partition
> >> > > > > > epoch
> >> > > > > > > in
> >> > > > > > > >> > the
> >> > > > > > > >> > > > > > message.
> >> > > > > > > >> > > > > > > The following is what I am thinking. When a
> >> > > partition
> >> > > > is
> >> > > > > > > >> created,
> >> > > > > > > >> > > we
> >> > > > > > > >> > > > > can
> >> > > > > > > >> > > > > > > assign a partition epoch from an
> >> ever-increasing
> >> > > > global
> >> > > > > > > >> counter
> >> > > > > > > >> > and
> >> > > > > > > >> > > > > store
> >> > > > > > > >> > > > > > > it in /brokers/topics/[topic]/
> >> > > > partitions/[partitionId]
> >> > > > > in
> >> > > > > > > ZK.
> >> > > > > > > >> > The
> >> > > > > > > >> > > > > > > partition
> >> > > > > > > >> > > > > > > epoch is propagated to every broker. The
> >> consumer
> >> > > will
> >> > > > > be
> >> > > > > > > >> > tracking
> >> > > > > > > >> > > a
> >> > > > > > > >> > > > > > tuple
> >> > > > > > > >> > > > > > > of <offset, leader epoch, partition epoch>
> for
> >> > > > offsets.
> >> > > > > > If a
> >> > > > > > > >> > topic
> >> > > > > > > >> > > is
> >> > > > > > > >> > > > > > > recreated, it's possible that a consumer's
> >> offset
> >> > > and
> >> > > > > > leader
> >> > > > > > > >> > epoch
> >> > > > > > > >> > > > > still
> >> > > > > > > >> > > > > > > match that in the broker, but partition epoch
> >> > won't
> >> > > > be.
> >> > > > > In
> >> > > > > > > >> this
> >> > > > > > > >> > > case,
> >> > > > > > > >> > > > > we
> >> > > > > > > >> > > > > > > can potentially still treat the consumer's
> >> offset
> >> > as
> >> > > > out
> >> > > > > > of
> >> > > > > > > >> range
> >> > > > > > > >> > > and
> >> > > > > > > >> > > > > > reset
> >> > > > > > > >> > > > > > > the offset based on the offset reset policy
> in
> >> the
> >> > > > > > consumer.
> >> > > > > > > >> This
> >> > > > > > > >> > > > seems
> >> > > > > > > >> > > > > > > harder to do with a global metadata version.
> >> > > > > > > >> > > > > > >
> >> > > > > > > >> > > > > > > Jun
> >> > > > > > > >> > > > > > >
> >> > > > > > > >> > > > > > >
> >> > > > > > > >> > > > > > >
> >> > > > > > > >> > > > > > > On Mon, Dec 25, 2017 at 6:56 AM, Dong Lin <
> >> > > > > > > >> lindon...@gmail.com>
> >> > > > > > > >> > > > wrote:
> >> > > > > > > >> > > > > > >
> >> > > > > > > >> > > > > > > > Hey Jun,
> >> > > > > > > >> > > > > > > >
> >> > > > > > > >> > > > > > > > This is a very good example. After thinking
> >> > > through
> >> > > > > this
> >> > > > > > > in
> >> > > > > > > >> > > > detail, I
> >> > > > > > > >> > > > > > > agree
> >> > > > > > > >> > > > > > > > that we need to commit offset with leader
> >> epoch
> >> > in
> >> > > > > order
> >> > > > > > > to
> >> > > > > > > >> > > address
> >> > > > > > > >> > > > > > this
> >> > > > > > > >> > > > > > > > example.
> >> > > > > > > >> > > > > > > >
> >> > > > > > > >> > > > > > > > I think the remaining question is how to
> >> address
> >> > > the
> >> > > > > > > >> scenario
> >> > > > > > > >> > > that
> >> > > > > > > >> > > > > the
> >> > > > > > > >> > > > > > > > topic is deleted and re-created. One
> possible
> >> > > > solution
> >> > > > > > is
> >> > > > > > > to
> >> > > > > > > >> > > commit
> >> > > > > > > >> > > > > > > offset
> >> > > > > > > >> > > > > > > > with both the leader epoch and the metadata
> >> > > version.
> >> > > > > The
> >> > > > > > > >> logic
> >> > > > > > > >> > > and
> >> > > > > > > >> > > > > the
> >> > > > > > > >> > > > > > > > implementation of this solution does not
> >> > require a
> >> > > > new
> >> > > > > > > >> concept
> >> > > > > > > >> > > > (e.g.
> >> > > > > > > >> > > > > > > > partition epoch) and it does not require
> any
> >> > > change
> >> > > > to
> >> > > > > > the
> >> > > > > > > >> > > message
> >> > > > > > > >> > > > > > format
> >> > > > > > > >> > > > > > > > or leader epoch. It also allows us to order
> >> the
> >> > > > > metadata
> >> > > > > > > in
> >> > > > > > > >> a
> >> > > > > > > >> > > > > > > > straightforward manner which may be useful
> in
> >> > the
> >> > > > > > future.
> >> > > > > > > >> So it
> >> > > > > > > >> > > may
> >> > > > > > > >> > > > > be
> >> > > > > > > >> > > > > > a
> >> > > > > > > >> > > > > > > > better solution than generating a random
> >> > partition
> >> > > > > epoch
> >> > > > > > > >> every
> >> > > > > > > >> > > time
> >> > > > > > > >> > > > > we
> >> > > > > > > >> > > > > > > > create a partition. Does this sound
> >> reasonable?
> >> > > > > > > >> > > > > > > >
> >> > > > > > > >> > > > > > > > Previously one concern with using the
> >> metadata
> >> > > > version
> >> > > > > > is
> >> > > > > > > >> that
> >> > > > > > > >> > > > > consumer
> >> > > > > > > >> > > > > > > > will be forced to refresh metadata even if
> >> > > metadata
> >> > > > > > > version
> >> > > > > > > >> is
> >> > > > > > > >> > > > > > increased
> >> > > > > > > >> > > > > > > > due to topics that the consumer is not
> >> > interested
> >> > > > in.
> >> > > > > > Now
> >> > > > > > > I
> >> > > > > > > >> > > > realized
> >> > > > > > > >> > > > > > that
> >> > > > > > > >> > > > > > > > this is probably not a problem. Currently
> >> client
> >> > > > will
> >> > > > > > > >> refresh
> >> > > > > > > >> > > > > metadata
> >> > > > > > > >> > > > > > > > either due to InvalidMetadataException in
> the
> >> > > > response
> >> > > > > > > from
> >> > > > > > > >> > > broker
> >> > > > > > > >> > > > or
> >> > > > > > > >> > > > > > due
> >> > > > > > > >> > > > > > > > to metadata expiry. The addition of the
> >> metadata
> >> > > > > version
> >> > > > > > > >> should
> >> > > > > > > >> > > > > > increase
> >> > > > > > > >> > > > > > > > the overhead of metadata refresh caused by
> >> > > > > > > >> > > > InvalidMetadataException.
> >> > > > > > > >> > > > > If
> >> > > > > > > >> > > > > > > > client refresh metadata due to expiry and
> it
> >> > > > receives
> >> > > > > a
> >> > > > > > > >> > metadata
> >> > > > > > > >> > > > > whose
> >> > > > > > > >> > > > > > > > version is lower than the current metadata
> >> > > version,
> >> > > > we
> >> > > > > > can
> >> > > > > > > >> > reject
> >> > > > > > > >> > > > the
> >> > > > > > > >> > > > > > > > metadata but still reset the metadata age,
> >> which
> >> > > > > > > essentially
> >> > > > > > > >> > keep
> >> > > > > > > >> > > > the
> >> > > > > > > >> > > > > > > > existing behavior in the client.
> >> > > > > > > >> > > > > > > >
> >> > > > > > > >> > > > > > > > Thanks much,
> >> > > > > > > >> > > > > > > > Dong
> >> > > > > > > >> > > > > > > >
> >> > > > > > > >> > > > > > >
> >> > > > > > > >> > > > > >
> >> > > > > > > >> > > > >
> >> > > > > > > >> > > >
> >> > > > > > > >> > >
> >> > > > > > > >> >
> >> > > > > > > >>
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Reply via email to