Hi, Dong,

My previous reply has a couple of typos. So, sending it again with the fix.

Consider the following scenario. In metadata v1, the leader for a partition
is at broker 1. In metadata v2, leader is at broker 2. In metadata v3,
leader is at broker 1 again. The last committed offset in v1, v2 and v3 are
10, 20 and 30, respectively. A consumer is started and reads metadata v1
and reads messages from offset 0 to 25 from broker 1. My understanding is
that in the current proposal, the metadata version associated with offset
25 is v1. The consumer is then restarted and fetches metadata v2. The
consumer tries to read from broker 2, which is the old leader with the last
offset at 20. In this case, the consumer will still get
OffsetOutOfRangeException incorrectly.

Fundamentally, it seems that the metadata version that we to want associate
with the offset should be the version when the data is published, not the
one that the consumer caches. In the above example, the metadata version
when offset 25 is published is v3. If the consumer uses that version, it
will be able to avoid the above issue. So, the metadata version ideally
probably should be stored with the message. We already store leader epoch
in each message set. So, reusing that will be the most convenient. It is
true that leader epoch is reset to 0 after the topic is re-created. To
address that issue, we can probably store a partition epoch in each message
set. The partition epoch can be generated randomly every time a partition
is created. If we want to avoid message format change, we can potentially
reserve the first byte of leader epoch as the partition epoch.

As I mentioned earlier, using leader epoch also has the benefit that it can
better handle the other corner case when existing message is lost due to
multiple replica failure (e.g. power outage) or unclean leader election.
Suppose in the above example, if the messages from offset 22 to 30 are lost
and republished with new messages at metadata v4, the consumer will know
that offset 25 is not valid since it's associated with v3. In this case,
the consumer can potentially reset its offset to the last offset in v3,
which is 22. This way, the consumer can re-consume all the republished
messages.

Thanks,

Jun

On Fri, Dec 22, 2017 at 5:04 PM, Jun Rao <j...@confluent.io> wrote:

> Hi, Dong,
>
> Thanks for the updated KIP. Still have some questions on the latest
> approach in the KIP.
>
> Consider the following scenario. In metadata v1, the leader for a
> partition is at broker 1. In metadata v2, leader is at broker 2. In
> metadata v3, leader is at broker 1 again. The last committed offset in v1,
> v2 and v3 are 10, 20 and 30, respectively. A consumer is started and reads
> metadata v1 and reads messages from offset 0 to 25 from broker 1. My
> understanding is that in the current proposal, the metadata version
> associated with offset 25 is v6. The consumer is then restarted and fetches
> metadata v7. The consumer tries to read from broker 2, which is the old
> leader with the last offset at 20. In this case, the consumer will still
> get OffsetOutOfRangeException incorrectly.
>
> Fundamentally, it seems that the metadata version that we to want
> associate with the offset should be the version when the data is published,
> not the one that the consumer caches. In the above example, the metadata
> version when offset 25 is published is v3. If the consumer uses that
> version, it will be able to avoid the above issue. So, the metadata version
> ideally probably should be stored with the message. We already store leader
> epoch in each message set. So, reusing that will be the most convenient. It
> is true that leader epoch is reset to 0 after the topic is re-created. To
> address that issue, we can probably store a partition epoch in each message
> set. The partition epoch can be generated randomly every time a partition
> is created. If we want to avoid message format change, we can potentially
> reserve the first byte of leader epoch as the partition epoch.
>
> As I mentioned earlier, using leader epoch also has the benefit that it
> can better handle the other corner case when existing message is lost due
> to multiple replica failure (e.g. power outage) or unclean leader election.
> Suppose in the above example, if the messages from offset 22 to 30 are lost
> and republished with new messages at metadata v4, the consumer will know
> that offset 25 is not valid since it's associated with v3. In this case,
> the consumer can potentially reset its offset to the last offset in v3,
> which is 22. This way, the consumer can re-consume all the republished
> messages.
>
> Thanks,
>
> Jun
>
>
> On Thu, Dec 21, 2017 at 2:46 PM, Dong Lin <lindon...@gmail.com> wrote:
>
>> Hey Jun,
>>
>> Thanks much for your comments. Yeah I have not considered the case where
>> the offset is stored externally.
>>
>> Based Jason's question, I think we probably have to use a global
>> metadata_epoch. And since we have a global metadata_epoch, this KIP
>> probably no longer needs the per-partition leader_epoch. Then we can use
>> two newly-added API in consumer that allows user to get the metadata_epoch
>> from consumer and wait for consumer to receive MetadataResponse whose
>> metadata_epoch >= the given metadata_epoch. These two APIs should address
>> the case where user stored offset externally. I have updated the KIP
>> accordingly. Could you take another look?
>>
>> Thanks for all the comments.
>>
>> Dong
>>
>>
>> On Tue, Dec 19, 2017 at 3:09 PM, Jun Rao <j...@confluent.io> wrote:
>>
>> > Hi, Dong,
>> >
>> > Thanks for the reply.
>> >
>> > 10. I was actually just thinking the case when the consumer consumes old
>> > data. If the current leader epoch is 3 and the consumer is consuming
>> > records generated in leader epoch 1, the epoch associated with the
>> offset
>> > should be 1. However, as you pointed out, the fetch response currently
>> > includes the leader epoch for fetched data. So, this is already covered.
>> >
>> > 11. That's an interesting thought. What about the case when the offsets
>> are
>> > stored externally? When we restart a consumer and seek to an externally
>> > stored offset, we won't know the leader epoch in the consumer. Do we
>> need
>> > another request to retrieve the leader epoch based on an offset and make
>> > sure the info is up to date? Another related thing is that the leader
>> epoch
>> > that we want to associate the offset with ideally should be the epoch
>> when
>> > the data is fetched. For example, when all replicas lost data due to a
>> > power failure or when there is an unclean leader election, the leader
>> epoch
>> > for a given offset may change over time on the broker. In those cases, a
>> > consumer's offset may be in range, but is not in the same leader epoch
>> for
>> > the time when the data is fetched. We can potentially do a smarter
>> offset
>> > reset in those cases if we remember the epoch when the data is fetched.
>> >
>> > Jun
>> >
>> >
>> >
>> > On Mon, Dec 18, 2017 at 1:58 PM, Dong Lin <lindon...@gmail.com> wrote:
>> >
>> > > Hey Jun,
>> > >
>> > > Thanks much for your comments. These are very thoughtful ideas. Please
>> > see
>> > > my comments below.
>> > >
>> > > On Thu, Dec 14, 2017 at 6:38 PM, Jun Rao <j...@confluent.io> wrote:
>> > >
>> > > > Hi, Dong,
>> > > >
>> > > > Thanks for the update. A few more comments below.
>> > > >
>> > > > 10. It seems that we need to return the leader epoch in the fetch
>> > > response
>> > > > as well When fetching data, we could be fetching data from a leader
>> > epoch
>> > > > older than what's returned in the metadata response. So, we want to
>> use
>> > > the
>> > > > leader epoch associated with the offset being fetched for committing
>> > > > offsets.
>> > > >
>> > >
>> > > It seems that we may have two separate issues here. The first issue is
>> > that
>> > > consumer uses metadata that is older than the one it uses before. The
>> > > second issue is that consumer uses metadata which is newer than the
>> > > corresponding leader epoch in the leader broker. We know that the
>> > > OffsetOutOfRangeException described in this KIP can be prevented by
>> > > avoiding the first issue. On the other hand, it seems that the
>> > > OffsetOffsetOutOfRangeException can still happen even if we avoid the
>> > > second issue -- if consumer uses an older version of metadata, the
>> leader
>> > > epoch in its metadata may equal the leader epoch in the broker even if
>> > the
>> > > leader epoch in the broker is oudated.
>> > >
>> > > Given this understanding, I am not sure why we need to return the
>> leader
>> > > epoch in the fetch response. As long as consumer's metadata is not
>> going
>> > > back in version, I think we are good. Did I miss something here?
>> > >
>> > >
>> > > >
>> > > > 11. Should we now extend OffsetAndMetadata used in the offset commit
>> > api
>> > > in
>> > > > KafkaConsumer to include leader epoch? Similarly, should we return
>> > leader
>> > > > epoch in endOffsets(), beginningOffsets() and position()? We
>> probably
>> > > need
>> > > > to think about how to make the api backward compatible.
>> > > >
>> > >
>> > > After thinking through this carefully, I think we probably don't want
>> to
>> > > extend OffsetAndMetadata to include leader epoch because leader epoch
>> is
>> > > kind of implementation detail which ideally should be hidden from
>> user.
>> > The
>> > > consumer can include leader epoch in the OffsetCommitRequest after
>> taking
>> > > offset from commitSync(final Map<TopicPartition, OffsetAndMetadata>
>> > > offsets). Similarly consumer can store leader epoch from
>> > > OffsetFetchResponse and only provide offset to user via
>> > > consumer.committed(topicPartition). This solution seems to work well
>> and
>> > > we
>> > > don't have to make changes to consumer's public API. Does this sound
>> OK?
>> > >
>> > >
>> > > >
>> > > > 12. It seems that we now need to store leader epoch in the offset
>> > topic.
>> > > > Could you include the new schema for the value of the offset topic
>> and
>> > > add
>> > > > upgrade notes?
>> > >
>> > >
>> > > You are right. I have updated the KIP to specify the new schema for
>> the
>> > > value of the offset topic. Can you take another look?
>> > >
>> > > For existing messages in the offset topic, leader_epoch will be
>> missing.
>> > We
>> > > will use leader_epoch = -1 to indicate the missing leader_epoch. Then
>> the
>> > > consumer behavior will be the same as it is now because any
>> leader_epoch
>> > in
>> > > the MetadataResponse will be larger than the leader_epoch = -1 in the
>> > > OffetFetchResponse. Thus we don't need specific procedure for upgrades
>> > due
>> > > to this change in the offset topic schema. By "upgrade nodes", do you
>> > mean
>> > > the sentences we need to include in the upgrade.html in the PR later?
>> > >
>> > >
>> > > >
>> > > > Jun
>> > > >
>> > > >
>> > > > On Tue, Dec 12, 2017 at 5:19 PM, Dong Lin <lindon...@gmail.com>
>> wrote:
>> > > >
>> > > > > Hey Jun,
>> > > > >
>> > > > > I see. Sounds good. Yeah it is probably simpler to leave this to
>> > > another
>> > > > > KIP in the future.
>> > > > >
>> > > > > Thanks for all the comments. Since there is no further comment in
>> the
>> > > > > community, I will open the voting thread.
>> > > > >
>> > > > > Thanks,
>> > > > > Dong
>> > > > >
>> > > > > On Mon, Dec 11, 2017 at 5:37 PM, Jun Rao <j...@confluent.io>
>> wrote:
>> > > > >
>> > > > > > Hi, Dong,
>> > > > > >
>> > > > > > The case that I am thinking is network partitioning. Suppose one
>> > > > deploys
>> > > > > a
>> > > > > > stretched cluster across multiple AZs in the same region. If the
>> > > > machines
>> > > > > > in one AZ can't communicate to brokers in other AZs due to a
>> > network
>> > > > > issue,
>> > > > > > the brokers in that AZ won't get any new metadata.
>> > > > > >
>> > > > > > We can potentially solve this problem by requiring some kind of
>> > > regular
>> > > > > > heartbeats between the controller and the broker. This may need
>> > some
>> > > > more
>> > > > > > thoughts. So, it's probably fine to leave this to another KIP in
>> > the
>> > > > > > future.
>> > > > > >
>> > > > > > Thanks,
>> > > > > >
>> > > > > > Jun
>> > > > > >
>> > > > > > On Mon, Dec 11, 2017 at 2:55 PM, Dong Lin <lindon...@gmail.com>
>> > > wrote:
>> > > > > >
>> > > > > > > Hey Jun,
>> > > > > > >
>> > > > > > > Thanks for the comment. I am open to improve this KIP to
>> address
>> > > more
>> > > > > > > problems. I probably need more help in understanding what is
>> the
>> > > > > current
>> > > > > > > problem with consumer using outdated metadata and whether it
>> is
>> > > > easier
>> > > > > to
>> > > > > > > address it together with this KIP.
>> > > > > > >
>> > > > > > > I agree that a consumer can potentially talk to old leader
>> for a
>> > > long
>> > > > > > time
>> > > > > > > even after this KIP. But after this KIP, the consumer probably
>> > > should
>> > > > > not
>> > > > > > > get OffetOutofRangeException and therefore will not cause
>> offset
>> > > > rewind
>> > > > > > > issue. So the only problem is that consumer will not be able
>> to
>> > > fetch
>> > > > > > data
>> > > > > > > until it has updated metadata. It seems that this situation
>> can
>> > > only
>> > > > > > happen
>> > > > > > > if the broker is too slow in processing LeaderAndIsrRequest
>> since
>> > > > > > otherwise
>> > > > > > > the consumer will be forced to update metadata due to
>> > > > > > > NotLeaderForPartitionException. So the problem we are having
>> > here
>> > > is
>> > > > > > that
>> > > > > > > consumer will not be able to fetch data if some broker is too
>> > slow
>> > > in
>> > > > > > > processing LeaderAndIsrRequest.
>> > > > > > >
>> > > > > > > Because Kafka propagates LeaderAndIsrRequest asynchronously to
>> > all
>> > > > > > brokers
>> > > > > > > in the cluster, there will always be a period of time when
>> > consumer
>> > > > can
>> > > > > > not
>> > > > > > > fetch data for the partition during the leadership change.
>> Thus
>> > it
>> > > > > seems
>> > > > > > > more like a broker-side performance issue instead of
>> client-side
>> > > > > > > correctness issue. My gut feel is that it is not causing a
>> much a
>> > > > > problem
>> > > > > > > as the problem to be fixed in this KIP. And if we were to
>> address
>> > > it,
>> > > > > we
>> > > > > > > probably need to make change in the broker side, e.g. with
>> > > > prioritized
>> > > > > > > queue for controller-related requests, which may be kind of
>> > > > orthogonal
>> > > > > to
>> > > > > > > this KIP. I am not very sure it will be easier to address it
>> with
>> > > the
>> > > > > > > change in this KIP. Do you have any recommendation?
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > Dong
>> > > > > > >
>> > > > > > >
>> > > > > > > On Mon, Dec 11, 2017 at 1:51 PM, Jun Rao <j...@confluent.io>
>> > wrote:
>> > > > > > >
>> > > > > > > > Hi, Dong,
>> > > > > > > >
>> > > > > > > > Thanks for the reply.
>> > > > > > > >
>> > > > > > > > My suggestion of forcing the metadata refresh from the
>> > controller
>> > > > may
>> > > > > > not
>> > > > > > > > work in general since the cached controller could be
>> outdated
>> > > too.
>> > > > > The
>> > > > > > > > general problem is that if a consumer's metadata is
>> outdated,
>> > it
>> > > > may
>> > > > > > get
>> > > > > > > > stuck with the old leader for a long time. We can address
>> the
>> > > issue
>> > > > > of
>> > > > > > > > detecting outdated metadata in a separate KIP in the future
>> if
>> > > you
>> > > > > > didn't
>> > > > > > > > intend to address it in this KIP.
>> > > > > > > >
>> > > > > > > > Thanks,
>> > > > > > > >
>> > > > > > > > Jun
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Sat, Dec 9, 2017 at 10:12 PM, Dong Lin <
>> lindon...@gmail.com
>> > >
>> > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Hey Jun,
>> > > > > > > > >
>> > > > > > > > > Thanks much for your comments. Given that client needs to
>> > > > > > de-serialize
>> > > > > > > > the
>> > > > > > > > > metadata anyway, the extra overhead of checking the
>> > > per-partition
>> > > > > > > version
>> > > > > > > > > for every partition should not be a big concern. Thus it
>> > makes
>> > > > > sense
>> > > > > > to
>> > > > > > > > use
>> > > > > > > > > leader epoch as the per-partition version instead of
>> > creating a
>> > > > > > global
>> > > > > > > > > metadata version. I will update the KIP to do that.
>> > > > > > > > >
>> > > > > > > > > Regarding the detection of outdated metadata, I think it
>> is
>> > > > > possible
>> > > > > > to
>> > > > > > > > > ensure that client gets latest metadata by fetching from
>> > > > > controller.
>> > > > > > > Note
>> > > > > > > > > that this requires extra logic in the controller such that
>> > > > > controller
>> > > > > > > > > updates metadata directly in memory without requiring
>> > > > > > > > > UpdateMetadataRequest. But I am not sure the main
>> motivation
>> > of
>> > > > > this
>> > > > > > at
>> > > > > > > > > this moment. But this makes controller more like a
>> bottleneck
>> > > in
>> > > > > the
>> > > > > > > > > cluster which we probably want to avoid.
>> > > > > > > > >
>> > > > > > > > > I think we can probably keep the current way of ensuring
>> > > metadata
>> > > > > > > > > freshness. Currently client will be forced to refresh
>> > metadata
>> > > if
>> > > > > > > broker
>> > > > > > > > > returns error (e.g. NotLeaderForPartition) due to outdated
>> > > > metadata
>> > > > > > or
>> > > > > > > if
>> > > > > > > > > the metadata does not contain the partition that the
>> client
>> > > > needs.
>> > > > > In
>> > > > > > > the
>> > > > > > > > > future, as you previously suggested, we can include
>> > > per-partition
>> > > > > > > > > leaderEpoch in the FetchRequest/ProduceRequest such that
>> > broker
>> > > > can
>> > > > > > > > return
>> > > > > > > > > error if the epoch is smaller than cached epoch in the
>> > broker.
>> > > > > Given
>> > > > > > > that
>> > > > > > > > > this adds more complexity to Kafka, I think we can
>> probably
>> > > think
>> > > > > > about
>> > > > > > > > > that leader when we have a specific use-case or problem to
>> > > solve
>> > > > > with
>> > > > > > > > > up-to-date metadata. Does this sound OK?
>> > > > > > > > >
>> > > > > > > > > Thanks,
>> > > > > > > > > Dong
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On Fri, Dec 8, 2017 at 3:53 PM, Jun Rao <j...@confluent.io
>> >
>> > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi, Dong,
>> > > > > > > > > >
>> > > > > > > > > > Thanks for the reply. A few more points below.
>> > > > > > > > > >
>> > > > > > > > > > For dealing with how to prevent a consumer switching
>> from a
>> > > new
>> > > > > > > leader
>> > > > > > > > to
>> > > > > > > > > > an old leader, you suggestion that refreshes metadata on
>> > > > consumer
>> > > > > > > > restart
>> > > > > > > > > > until it sees a metadata version >= the one associated
>> with
>> > > the
>> > > > > > > offset
>> > > > > > > > > > works too, as long as we guarantee that the cached
>> metadata
>> > > > > > versions
>> > > > > > > on
>> > > > > > > > > the
>> > > > > > > > > > brokers only go up.
>> > > > > > > > > >
>> > > > > > > > > > The second discussion point is on whether the metadata
>> > > > versioning
>> > > > > > > > should
>> > > > > > > > > be
>> > > > > > > > > > per partition or global. For the partition level
>> > versioning,
>> > > > you
>> > > > > > were
>> > > > > > > > > > concerned about the performance. Given that metadata
>> > updates
>> > > > are
>> > > > > > > rare,
>> > > > > > > > I
>> > > > > > > > > am
>> > > > > > > > > > not sure if it's a big concern though. Doing a million
>> if
>> > > tests
>> > > > > is
>> > > > > > > > > probably
>> > > > > > > > > > going to take less than 1ms. Another thing is that the
>> > > metadata
>> > > > > > > version
>> > > > > > > > > > seems to need to survive controller failover. In your
>> > current
>> > > > > > > > approach, a
>> > > > > > > > > > consumer may not be able to wait on the right version of
>> > the
>> > > > > > metadata
>> > > > > > > > > after
>> > > > > > > > > > the consumer restart since the metadata version may have
>> > been
>> > > > > > > recycled
>> > > > > > > > on
>> > > > > > > > > > the server side due to a controller failover while the
>> > > consumer
>> > > > > is
>> > > > > > > > down.
>> > > > > > > > > > The partition level leaderEpoch survives controller
>> failure
>> > > and
>> > > > > > won't
>> > > > > > > > > have
>> > > > > > > > > > this issue.
>> > > > > > > > > >
>> > > > > > > > > > Lastly, neither your proposal nor mine addresses the
>> issue
>> > > how
>> > > > to
>> > > > > > > > > guarantee
>> > > > > > > > > > a consumer to detect that is metadata is outdated.
>> > Currently,
>> > > > the
>> > > > > > > > > consumer
>> > > > > > > > > > is not guaranteed to fetch metadata from every broker
>> > within
>> > > > some
>> > > > > > > > bounded
>> > > > > > > > > > period of time. Maybe this is out of the scope of your
>> KIP.
>> > > But
>> > > > > one
>> > > > > > > > idea
>> > > > > > > > > is
>> > > > > > > > > > force the consumer to refresh metadata from the
>> controller
>> > > > > > > > periodically.
>> > > > > > > > > >
>> > > > > > > > > > Jun
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > On Thu, Dec 7, 2017 at 11:25 AM, Dong Lin <
>> > > lindon...@gmail.com
>> > > > >
>> > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > Hey Jun,
>> > > > > > > > > > >
>> > > > > > > > > > > Thanks much for the comments. Great point particularly
>> > > > > regarding
>> > > > > > > > (3). I
>> > > > > > > > > > > haven't thought about this before.
>> > > > > > > > > > >
>> > > > > > > > > > > It seems that there are two possible ways where the
>> > version
>> > > > > > number
>> > > > > > > > can
>> > > > > > > > > be
>> > > > > > > > > > > used. One solution is for client to check the version
>> > > number
>> > > > at
>> > > > > > the
>> > > > > > > > > time
>> > > > > > > > > > it
>> > > > > > > > > > > receives MetadataResponse. And if the version number
>> in
>> > the
>> > > > > > > > > > > MetadataResponse is smaller than the version number in
>> > the
>> > > > > > client's
>> > > > > > > > > > cache,
>> > > > > > > > > > > the client will be forced to fetch metadata again.
>> > Another
>> > > > > > > solution,
>> > > > > > > > > as
>> > > > > > > > > > > you have suggested, is for broker to check the version
>> > > number
>> > > > > at
>> > > > > > > the
>> > > > > > > > > time
>> > > > > > > > > > > it receives a request from client. The broker will
>> reject
>> > > the
>> > > > > > > request
>> > > > > > > > > if
>> > > > > > > > > > > the version is smaller than the version in broker's
>> > cache.
>> > > > > > > > > > >
>> > > > > > > > > > > I am not very sure that the second solution can
>> address
>> > the
>> > > > > > problem
>> > > > > > > > > here.
>> > > > > > > > > > > In the scenario described in the JIRA ticket, broker's
>> > > cache
>> > > > > may
>> > > > > > be
>> > > > > > > > > > > outdated because it has not processed the
>> > > LeaderAndIsrRequest
>> > > > > > from
>> > > > > > > > the
>> > > > > > > > > > > controller. Thus it may still process client's request
>> > even
>> > > > if
>> > > > > > the
>> > > > > > > > > > version
>> > > > > > > > > > > in client's request is actually outdated. Does this
>> make
>> > > > sense?
>> > > > > > > > > > >
>> > > > > > > > > > > IMO, it seems that we can address problem (3) by
>> saving
>> > the
>> > > > > > > metadata
>> > > > > > > > > > > version together with the offset. After consumer
>> starts,
>> > it
>> > > > > will
>> > > > > > > keep
>> > > > > > > > > > > fetching metadata until the metadata version >= the
>> > version
>> > > > > saved
>> > > > > > > > with
>> > > > > > > > > > the
>> > > > > > > > > > > offset of this partition.
>> > > > > > > > > > >
>> > > > > > > > > > > Regarding problems (1) and (2): Currently we use the
>> > > version
>> > > > > > number
>> > > > > > > > in
>> > > > > > > > > > the
>> > > > > > > > > > > MetadataResponse to ensure that the metadata does not
>> go
>> > > back
>> > > > > in
>> > > > > > > > time.
>> > > > > > > > > > > There are two alternative solutions to address
>> problems
>> > (1)
>> > > > and
>> > > > > > > (2).
>> > > > > > > > > One
>> > > > > > > > > > > solution is for client to enumerate all partitions in
>> the
>> > > > > > > > > > MetadataResponse,
>> > > > > > > > > > > compare their epoch with those in the cached metadata,
>> > and
>> > > > > > rejects
>> > > > > > > > the
>> > > > > > > > > > > MetadataResponse iff any leader epoch is smaller. The
>> > main
>> > > > > > concern
>> > > > > > > is
>> > > > > > > > > > that
>> > > > > > > > > > > MetadataResponse currently cached information of all
>> > > > partitions
>> > > > > > in
>> > > > > > > > the
>> > > > > > > > > > > entire cluster. It may slow down client's performance
>> if
>> > we
>> > > > > were
>> > > > > > to
>> > > > > > > > do
>> > > > > > > > > > it.
>> > > > > > > > > > > The other solution is for client to enumerate
>> partitions
>> > > for
>> > > > > only
>> > > > > > > > > topics
>> > > > > > > > > > > registered in the org.apache.kafka.clients.Metadata,
>> > which
>> > > > > will
>> > > > > > be
>> > > > > > > > an
>> > > > > > > > > > > empty
>> > > > > > > > > > > set for producer and the set of subscribed partitions
>> for
>> > > > > > consumer.
>> > > > > > > > But
>> > > > > > > > > > > this degrades to all topics if consumer subscribes to
>> > > topics
>> > > > in
>> > > > > > the
>> > > > > > > > > > cluster
>> > > > > > > > > > > by pattern.
>> > > > > > > > > > >
>> > > > > > > > > > > Note that client will only be forced to update
>> metadata
>> > if
>> > > > the
>> > > > > > > > version
>> > > > > > > > > in
>> > > > > > > > > > > the MetadataResponse is smaller than the version in
>> the
>> > > > cached
>> > > > > > > > > metadata.
>> > > > > > > > > > In
>> > > > > > > > > > > general it should not be a problem. It can be a
>> problem
>> > > only
>> > > > if
>> > > > > > > some
>> > > > > > > > > > broker
>> > > > > > > > > > > is particularly slower than other brokers in
>> processing
>> > > > > > > > > > > UpdateMetadataRequest. When this is the case, it means
>> > that
>> > > > the
>> > > > > > > > broker
>> > > > > > > > > is
>> > > > > > > > > > > also particularly slower in processing
>> > LeaderAndIsrRequest,
>> > > > > which
>> > > > > > > can
>> > > > > > > > > > cause
>> > > > > > > > > > > problem anyway because some partition will probably
>> have
>> > no
>> > > > > > leader
>> > > > > > > > > during
>> > > > > > > > > > > this period. I am not sure problems (1) and (2) cause
>> > more
>> > > > > > problem
>> > > > > > > > than
>> > > > > > > > > > > what we already have.
>> > > > > > > > > > >
>> > > > > > > > > > > Thanks,
>> > > > > > > > > > > Dong
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > On Wed, Dec 6, 2017 at 6:42 PM, Jun Rao <
>> > j...@confluent.io>
>> > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Hi, Dong,
>> > > > > > > > > > > >
>> > > > > > > > > > > > Great finding on the issue. It's a real problem. A
>> few
>> > > > > comments
>> > > > > > > > about
>> > > > > > > > > > the
>> > > > > > > > > > > > KIP. (1) I am not sure about updating
>> > > > > controller_metadata_epoch
>> > > > > > > on
>> > > > > > > > > > every
>> > > > > > > > > > > > UpdateMetadataRequest. Currently, the controller can
>> > send
>> > > > > > > > > > > > UpdateMetadataRequest when there is no actual
>> metadata
>> > > > > change.
>> > > > > > > > Doing
>> > > > > > > > > > this
>> > > > > > > > > > > > may require unnecessary metadata refresh on the
>> client.
>> > > (2)
>> > > > > > > > > > > > controller_metadata_epoch is global across all
>> topics.
>> > > This
>> > > > > > means
>> > > > > > > > > that
>> > > > > > > > > > a
>> > > > > > > > > > > > client may be forced to update its metadata even
>> when
>> > the
>> > > > > > > metadata
>> > > > > > > > > for
>> > > > > > > > > > > the
>> > > > > > > > > > > > topics that it cares haven't changed. (3) It doesn't
>> > seem
>> > > > > that
>> > > > > > > the
>> > > > > > > > > KIP
>> > > > > > > > > > > > handles the corner case when a consumer is
>> restarted.
>> > > Say a
>> > > > > > > > consumer
>> > > > > > > > > > > reads
>> > > > > > > > > > > > from the new leader, commits the offset and then is
>> > > > > restarted.
>> > > > > > On
>> > > > > > > > > > > restart,
>> > > > > > > > > > > > the consumer gets an outdated metadata and fetches
>> from
>> > > the
>> > > > > old
>> > > > > > > > > leader.
>> > > > > > > > > > > > Then, the consumer will get into the offset out of
>> > range
>> > > > > issue.
>> > > > > > > > > > > >
>> > > > > > > > > > > > Given the above, I am thinking of the following
>> > approach.
>> > > > We
>> > > > > > > > actually
>> > > > > > > > > > > > already have metadata versioning at the partition
>> > level.
>> > > > Each
>> > > > > > > > leader
>> > > > > > > > > > has
>> > > > > > > > > > > a
>> > > > > > > > > > > > leader epoch which is monotonically increasing. We
>> can
>> > > > > > > potentially
>> > > > > > > > > > > > propagate leader epoch back in the metadata response
>> > and
>> > > > the
>> > > > > > > > clients
>> > > > > > > > > > can
>> > > > > > > > > > > > cache that. This solves the issue of (1) and (2). To
>> > > solve
>> > > > > (3),
>> > > > > > > > when
>> > > > > > > > > > > saving
>> > > > > > > > > > > > an offset, we could save both an offset and the
>> > > > corresponding
>> > > > > > > > leader
>> > > > > > > > > > > epoch.
>> > > > > > > > > > > > When fetching the data, the consumer provides both
>> the
>> > > > offset
>> > > > > > and
>> > > > > > > > the
>> > > > > > > > > > > > leader epoch. A leader will only serve the request
>> if
>> > its
>> > > > > > leader
>> > > > > > > > > epoch
>> > > > > > > > > > is
>> > > > > > > > > > > > equal to or greater than the leader epoch from the
>> > > > consumer.
>> > > > > To
>> > > > > > > > > achieve
>> > > > > > > > > > > > this, we need to change the fetch request protocol
>> and
>> > > the
>> > > > > > offset
>> > > > > > > > > > commit
>> > > > > > > > > > > > api, which requires some more thoughts.
>> > > > > > > > > > > >
>> > > > > > > > > > > > Thanks,
>> > > > > > > > > > > >
>> > > > > > > > > > > > Jun
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Wed, Dec 6, 2017 at 10:57 AM, Dong Lin <
>> > > > > lindon...@gmail.com
>> > > > > > >
>> > > > > > > > > wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > Bump up the thread.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > It will be great to have more comments on whether
>> we
>> > > > should
>> > > > > > do
>> > > > > > > it
>> > > > > > > > > or
>> > > > > > > > > > > > > whether there is better way to address the
>> motivation
>> > > of
>> > > > > this
>> > > > > > > > KIP.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > On Mon, Dec 4, 2017 at 3:09 PM, Dong Lin <
>> > > > > > lindon...@gmail.com>
>> > > > > > > > > > wrote:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > > I don't have an interesting rejected alternative
>> > > > solution
>> > > > > > to
>> > > > > > > > put
>> > > > > > > > > in
>> > > > > > > > > > > the
>> > > > > > > > > > > > > > KIP. If there is good alternative solution from
>> > > anyone
>> > > > in
>> > > > > > > this
>> > > > > > > > > > > thread,
>> > > > > > > > > > > > I
>> > > > > > > > > > > > > am
>> > > > > > > > > > > > > > happy to discuss this and update the KIP
>> > accordingly.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > > Dong
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > On Mon, Dec 4, 2017 at 1:12 PM, Ted Yu <
>> > > > > > yuzhih...@gmail.com>
>> > > > > > > > > > wrote:
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >> It is clearer now.
>> > > > > > > > > > > > > >>
>> > > > > > > > > > > > > >> I noticed that Rejected Alternatives section is
>> > > empty.
>> > > > > > > > > > > > > >> Have you considered any alternative ?
>> > > > > > > > > > > > > >>
>> > > > > > > > > > > > > >> Cheers
>> > > > > > > > > > > > > >>
>> > > > > > > > > > > > > >> On Mon, Dec 4, 2017 at 1:07 PM, Dong Lin <
>> > > > > > > lindon...@gmail.com
>> > > > > > > > >
>> > > > > > > > > > > wrote:
>> > > > > > > > > > > > > >>
>> > > > > > > > > > > > > >> > Ted, thanks for catching this. I have updated
>> > the
>> > > > > > sentence
>> > > > > > > > to
>> > > > > > > > > > make
>> > > > > > > > > > > > it
>> > > > > > > > > > > > > >> > readable.
>> > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > >> > Thanks,
>> > > > > > > > > > > > > >> > Dong
>> > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > >> > On Sat, Dec 2, 2017 at 3:05 PM, Ted Yu <
>> > > > > > > yuzhih...@gmail.com
>> > > > > > > > >
>> > > > > > > > > > > wrote:
>> > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > >> > > bq. It the controller_epoch of the incoming
>> > > > > > > > > MetadataResponse,
>> > > > > > > > > > or
>> > > > > > > > > > > > if
>> > > > > > > > > > > > > >> the
>> > > > > > > > > > > > > >> > > controller_epoch is the same but the
>> > > > > > > > > controller_metadata_epoch
>> > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > >> > > Can you update the above sentence so that
>> the
>> > > > > > intention
>> > > > > > > is
>> > > > > > > > > > > > clearer ?
>> > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > >> > > Thanks
>> > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > >> > > On Fri, Dec 1, 2017 at 6:33 PM, Dong Lin <
>> > > > > > > > > lindon...@gmail.com
>> > > > > > > > > > >
>> > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > >> > > > Hi all,
>> > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > >> > > > I have created KIP-232: Detect outdated
>> > > metadata
>> > > > > by
>> > > > > > > > adding
>> > > > > > > > > > > > > >> > > > ControllerMetadataEpoch field:
>> > > > > > > > > > > > > >> > > > https://cwiki.apache.org/
>> > > > > > > confluence/display/KAFKA/KIP-
>> > > > > > > > > > > > > >> > > > 232%3A+Detect+outdated+metadat
>> a+by+adding+
>> > > > > > > > > > > > > >> > ControllerMetadataEpoch+field
>> > > > > > > > > > > > > >> > > > .
>> > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > >> > > > The KIP proposes to add fields in
>> > > > MetadataResponse
>> > > > > > and
>> > > > > > > > > > > > > >> > > > UpdateMetadataRequest so that client can
>> > > reject
>> > > > > > > outdated
>> > > > > > > > > > > > metadata
>> > > > > > > > > > > > > >> and
>> > > > > > > > > > > > > >> > > avoid
>> > > > > > > > > > > > > >> > > > unnecessary OffsetOutOfRangeException.
>> > > Otherwise
>> > > > > > there
>> > > > > > > > is
>> > > > > > > > > > > > > currently
>> > > > > > > > > > > > > >> > race
>> > > > > > > > > > > > > >> > > > condition that can cause consumer to
>> reset
>> > > > offset
>> > > > > > > which
>> > > > > > > > > > > > negatively
>> > > > > > > > > > > > > >> > affect
>> > > > > > > > > > > > > >> > > > the consumer's availability.
>> > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > >> > > > Feedback and suggestions are welcome!
>> > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > >> > > > Regards,
>> > > > > > > > > > > > > >> > > > Dong
>> > > > > > > > > > > > > >> > > >
>> > > > > > > > > > > > > >> > >
>> > > > > > > > > > > > > >> >
>> > > > > > > > > > > > > >>
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to