Hey Jun,

I have updated the KIP based on our discussion. Thanks!

Dong

On Sat, Dec 9, 2017 at 10:12 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jun,
>
> Thanks much for your comments. Given that client needs to de-serialize the
> metadata anyway, the extra overhead of checking the per-partition version
> for every partition should not be a big concern. Thus it makes sense to use
> leader epoch as the per-partition version instead of creating a global
> metadata version. I will update the KIP to do that.
>
> Regarding the detection of outdated metadata, I think it is possible to
> ensure that client gets latest metadata by fetching from controller. Note
> that this requires extra logic in the controller such that controller
> updates metadata directly in memory without requiring
> UpdateMetadataRequest. But I am not sure the main motivation of this at
> this moment. But this makes controller more like a bottleneck in the
> cluster which we probably want to avoid.
>
> I think we can probably keep the current way of ensuring metadata
> freshness. Currently client will be forced to refresh metadata if broker
> returns error (e.g. NotLeaderForPartition) due to outdated metadata or if
> the metadata does not contain the partition that the client needs. In the
> future, as you previously suggested, we can include per-partition
> leaderEpoch in the FetchRequest/ProduceRequest such that broker can return
> error if the epoch is smaller than cached epoch in the broker. Given that
> this adds more complexity to Kafka, I think we can probably think about
> that leader when we have a specific use-case or problem to solve with
> up-to-date metadata. Does this sound OK?
>
> Thanks,
> Dong
>
>
>
> On Fri, Dec 8, 2017 at 3:53 PM, Jun Rao <j...@confluent.io> wrote:
>
>> Hi, Dong,
>>
>> Thanks for the reply. A few more points below.
>>
>> For dealing with how to prevent a consumer switching from a new leader to
>> an old leader, you suggestion that refreshes metadata on consumer restart
>> until it sees a metadata version >= the one associated with the offset
>> works too, as long as we guarantee that the cached metadata versions on
>> the
>> brokers only go up.
>>
>> The second discussion point is on whether the metadata versioning should
>> be
>> per partition or global. For the partition level versioning, you were
>> concerned about the performance. Given that metadata updates are rare, I
>> am
>> not sure if it's a big concern though. Doing a million if tests is
>> probably
>> going to take less than 1ms. Another thing is that the metadata version
>> seems to need to survive controller failover. In your current approach, a
>> consumer may not be able to wait on the right version of the metadata
>> after
>> the consumer restart since the metadata version may have been recycled on
>> the server side due to a controller failover while the consumer is down.
>> The partition level leaderEpoch survives controller failure and won't have
>> this issue.
>>
>> Lastly, neither your proposal nor mine addresses the issue how to
>> guarantee
>> a consumer to detect that is metadata is outdated. Currently, the consumer
>> is not guaranteed to fetch metadata from every broker within some bounded
>> period of time. Maybe this is out of the scope of your KIP. But one idea
>> is
>> force the consumer to refresh metadata from the controller periodically.
>>
>> Jun
>>
>>
>> On Thu, Dec 7, 2017 at 11:25 AM, Dong Lin <lindon...@gmail.com> wrote:
>>
>> > Hey Jun,
>> >
>> > Thanks much for the comments. Great point particularly regarding (3). I
>> > haven't thought about this before.
>> >
>> > It seems that there are two possible ways where the version number can
>> be
>> > used. One solution is for client to check the version number at the
>> time it
>> > receives MetadataResponse. And if the version number in the
>> > MetadataResponse is smaller than the version number in the client's
>> cache,
>> > the client will be forced to fetch metadata again.  Another solution, as
>> > you have suggested, is for broker to check the version number at the
>> time
>> > it receives a request from client. The broker will reject the request if
>> > the version is smaller than the version in broker's cache.
>> >
>> > I am not very sure that the second solution can address the problem
>> here.
>> > In the scenario described in the JIRA ticket, broker's cache may be
>> > outdated because it has not processed the LeaderAndIsrRequest from the
>> > controller. Thus it may still process client's request even if the
>> version
>> > in client's request is actually outdated. Does this make sense?
>> >
>> > IMO, it seems that we can address problem (3) by saving the metadata
>> > version together with the offset. After consumer starts, it will keep
>> > fetching metadata until the metadata version >= the version saved with
>> the
>> > offset of this partition.
>> >
>> > Regarding problems (1) and (2): Currently we use the version number in
>> the
>> > MetadataResponse to ensure that the metadata does not go back in time.
>> > There are two alternative solutions to address problems (1) and (2). One
>> > solution is for client to enumerate all partitions in the
>> MetadataResponse,
>> > compare their epoch with those in the cached metadata, and rejects the
>> > MetadataResponse iff any leader epoch is smaller. The main concern is
>> that
>> > MetadataResponse currently cached information of all partitions in the
>> > entire cluster. It may slow down client's performance if we were to do
>> it.
>> > The other solution is for client to enumerate partitions for only topics
>> > registered in the org.apache.kafka.clients.Metadata, which will be an
>> > empty
>> > set for producer and the set of subscribed partitions for consumer. But
>> > this degrades to all topics if consumer subscribes to topics in the
>> cluster
>> > by pattern.
>> >
>> > Note that client will only be forced to update metadata if the version
>> in
>> > the MetadataResponse is smaller than the version in the cached
>> metadata. In
>> > general it should not be a problem. It can be a problem only if some
>> broker
>> > is particularly slower than other brokers in processing
>> > UpdateMetadataRequest. When this is the case, it means that the broker
>> is
>> > also particularly slower in processing LeaderAndIsrRequest, which can
>> cause
>> > problem anyway because some partition will probably have no leader
>> during
>> > this period. I am not sure problems (1) and (2) cause more problem than
>> > what we already have.
>> >
>> > Thanks,
>> > Dong
>> >
>> >
>> > On Wed, Dec 6, 2017 at 6:42 PM, Jun Rao <j...@confluent.io> wrote:
>> >
>> > > Hi, Dong,
>> > >
>> > > Great finding on the issue. It's a real problem. A few comments about
>> the
>> > > KIP. (1) I am not sure about updating controller_metadata_epoch on
>> every
>> > > UpdateMetadataRequest. Currently, the controller can send
>> > > UpdateMetadataRequest when there is no actual metadata change. Doing
>> this
>> > > may require unnecessary metadata refresh on the client. (2)
>> > > controller_metadata_epoch is global across all topics. This means
>> that a
>> > > client may be forced to update its metadata even when the metadata for
>> > the
>> > > topics that it cares haven't changed. (3) It doesn't seem that the KIP
>> > > handles the corner case when a consumer is restarted. Say a consumer
>> > reads
>> > > from the new leader, commits the offset and then is restarted. On
>> > restart,
>> > > the consumer gets an outdated metadata and fetches from the old
>> leader.
>> > > Then, the consumer will get into the offset out of range issue.
>> > >
>> > > Given the above, I am thinking of the following approach. We actually
>> > > already have metadata versioning at the partition level. Each leader
>> has
>> > a
>> > > leader epoch which is monotonically increasing. We can potentially
>> > > propagate leader epoch back in the metadata response and the clients
>> can
>> > > cache that. This solves the issue of (1) and (2). To solve (3), when
>> > saving
>> > > an offset, we could save both an offset and the corresponding leader
>> > epoch.
>> > > When fetching the data, the consumer provides both the offset and the
>> > > leader epoch. A leader will only serve the request if its leader
>> epoch is
>> > > equal to or greater than the leader epoch from the consumer. To
>> achieve
>> > > this, we need to change the fetch request protocol and the offset
>> commit
>> > > api, which requires some more thoughts.
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Wed, Dec 6, 2017 at 10:57 AM, Dong Lin <lindon...@gmail.com>
>> wrote:
>> > >
>> > > > Bump up the thread.
>> > > >
>> > > > It will be great to have more comments on whether we should do it or
>> > > > whether there is better way to address the motivation of this KIP.
>> > > >
>> > > > On Mon, Dec 4, 2017 at 3:09 PM, Dong Lin <lindon...@gmail.com>
>> wrote:
>> > > >
>> > > > > I don't have an interesting rejected alternative solution to put
>> in
>> > the
>> > > > > KIP. If there is good alternative solution from anyone in this
>> > thread,
>> > > I
>> > > > am
>> > > > > happy to discuss this and update the KIP accordingly.
>> > > > >
>> > > > > Thanks,
>> > > > > Dong
>> > > > >
>> > > > > On Mon, Dec 4, 2017 at 1:12 PM, Ted Yu <yuzhih...@gmail.com>
>> wrote:
>> > > > >
>> > > > >> It is clearer now.
>> > > > >>
>> > > > >> I noticed that Rejected Alternatives section is empty.
>> > > > >> Have you considered any alternative ?
>> > > > >>
>> > > > >> Cheers
>> > > > >>
>> > > > >> On Mon, Dec 4, 2017 at 1:07 PM, Dong Lin <lindon...@gmail.com>
>> > wrote:
>> > > > >>
>> > > > >> > Ted, thanks for catching this. I have updated the sentence to
>> make
>> > > it
>> > > > >> > readable.
>> > > > >> >
>> > > > >> > Thanks,
>> > > > >> > Dong
>> > > > >> >
>> > > > >> > On Sat, Dec 2, 2017 at 3:05 PM, Ted Yu <yuzhih...@gmail.com>
>> > wrote:
>> > > > >> >
>> > > > >> > > bq. It the controller_epoch of the incoming
>> MetadataResponse, or
>> > > if
>> > > > >> the
>> > > > >> > > controller_epoch is the same but the
>> controller_metadata_epoch
>> > > > >> > >
>> > > > >> > > Can you update the above sentence so that the intention is
>> > > clearer ?
>> > > > >> > >
>> > > > >> > > Thanks
>> > > > >> > >
>> > > > >> > > On Fri, Dec 1, 2017 at 6:33 PM, Dong Lin <
>> lindon...@gmail.com>
>> > > > wrote:
>> > > > >> > >
>> > > > >> > > > Hi all,
>> > > > >> > > >
>> > > > >> > > > I have created KIP-232: Detect outdated metadata by adding
>> > > > >> > > > ControllerMetadataEpoch field:
>> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > >> > > > 232%3A+Detect+outdated+metadata+by+adding+
>> > > > >> > ControllerMetadataEpoch+field
>> > > > >> > > > .
>> > > > >> > > >
>> > > > >> > > > The KIP proposes to add fields in MetadataResponse and
>> > > > >> > > > UpdateMetadataRequest so that client can reject outdated
>> > > metadata
>> > > > >> and
>> > > > >> > > avoid
>> > > > >> > > > unnecessary OffsetOutOfRangeException. Otherwise there is
>> > > > currently
>> > > > >> > race
>> > > > >> > > > condition that can cause consumer to reset offset which
>> > > negatively
>> > > > >> > affect
>> > > > >> > > > the consumer's availability.
>> > > > >> > > >
>> > > > >> > > > Feedback and suggestions are welcome!
>> > > > >> > > >
>> > > > >> > > > Regards,
>> > > > >> > > > Dong
>> > > > >> > > >
>> > > > >> > >
>> > > > >> >
>> > > > >>
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to