Hey Jun, I have updated the KIP based on our discussion. Thanks!
Dong On Sat, Dec 9, 2017 at 10:12 PM, Dong Lin <lindon...@gmail.com> wrote: > Hey Jun, > > Thanks much for your comments. Given that client needs to de-serialize the > metadata anyway, the extra overhead of checking the per-partition version > for every partition should not be a big concern. Thus it makes sense to use > leader epoch as the per-partition version instead of creating a global > metadata version. I will update the KIP to do that. > > Regarding the detection of outdated metadata, I think it is possible to > ensure that client gets latest metadata by fetching from controller. Note > that this requires extra logic in the controller such that controller > updates metadata directly in memory without requiring > UpdateMetadataRequest. But I am not sure the main motivation of this at > this moment. But this makes controller more like a bottleneck in the > cluster which we probably want to avoid. > > I think we can probably keep the current way of ensuring metadata > freshness. Currently client will be forced to refresh metadata if broker > returns error (e.g. NotLeaderForPartition) due to outdated metadata or if > the metadata does not contain the partition that the client needs. In the > future, as you previously suggested, we can include per-partition > leaderEpoch in the FetchRequest/ProduceRequest such that broker can return > error if the epoch is smaller than cached epoch in the broker. Given that > this adds more complexity to Kafka, I think we can probably think about > that leader when we have a specific use-case or problem to solve with > up-to-date metadata. Does this sound OK? > > Thanks, > Dong > > > > On Fri, Dec 8, 2017 at 3:53 PM, Jun Rao <j...@confluent.io> wrote: > >> Hi, Dong, >> >> Thanks for the reply. A few more points below. >> >> For dealing with how to prevent a consumer switching from a new leader to >> an old leader, you suggestion that refreshes metadata on consumer restart >> until it sees a metadata version >= the one associated with the offset >> works too, as long as we guarantee that the cached metadata versions on >> the >> brokers only go up. >> >> The second discussion point is on whether the metadata versioning should >> be >> per partition or global. For the partition level versioning, you were >> concerned about the performance. Given that metadata updates are rare, I >> am >> not sure if it's a big concern though. Doing a million if tests is >> probably >> going to take less than 1ms. Another thing is that the metadata version >> seems to need to survive controller failover. In your current approach, a >> consumer may not be able to wait on the right version of the metadata >> after >> the consumer restart since the metadata version may have been recycled on >> the server side due to a controller failover while the consumer is down. >> The partition level leaderEpoch survives controller failure and won't have >> this issue. >> >> Lastly, neither your proposal nor mine addresses the issue how to >> guarantee >> a consumer to detect that is metadata is outdated. Currently, the consumer >> is not guaranteed to fetch metadata from every broker within some bounded >> period of time. Maybe this is out of the scope of your KIP. But one idea >> is >> force the consumer to refresh metadata from the controller periodically. >> >> Jun >> >> >> On Thu, Dec 7, 2017 at 11:25 AM, Dong Lin <lindon...@gmail.com> wrote: >> >> > Hey Jun, >> > >> > Thanks much for the comments. Great point particularly regarding (3). I >> > haven't thought about this before. >> > >> > It seems that there are two possible ways where the version number can >> be >> > used. One solution is for client to check the version number at the >> time it >> > receives MetadataResponse. And if the version number in the >> > MetadataResponse is smaller than the version number in the client's >> cache, >> > the client will be forced to fetch metadata again. Another solution, as >> > you have suggested, is for broker to check the version number at the >> time >> > it receives a request from client. The broker will reject the request if >> > the version is smaller than the version in broker's cache. >> > >> > I am not very sure that the second solution can address the problem >> here. >> > In the scenario described in the JIRA ticket, broker's cache may be >> > outdated because it has not processed the LeaderAndIsrRequest from the >> > controller. Thus it may still process client's request even if the >> version >> > in client's request is actually outdated. Does this make sense? >> > >> > IMO, it seems that we can address problem (3) by saving the metadata >> > version together with the offset. After consumer starts, it will keep >> > fetching metadata until the metadata version >= the version saved with >> the >> > offset of this partition. >> > >> > Regarding problems (1) and (2): Currently we use the version number in >> the >> > MetadataResponse to ensure that the metadata does not go back in time. >> > There are two alternative solutions to address problems (1) and (2). One >> > solution is for client to enumerate all partitions in the >> MetadataResponse, >> > compare their epoch with those in the cached metadata, and rejects the >> > MetadataResponse iff any leader epoch is smaller. The main concern is >> that >> > MetadataResponse currently cached information of all partitions in the >> > entire cluster. It may slow down client's performance if we were to do >> it. >> > The other solution is for client to enumerate partitions for only topics >> > registered in the org.apache.kafka.clients.Metadata, which will be an >> > empty >> > set for producer and the set of subscribed partitions for consumer. But >> > this degrades to all topics if consumer subscribes to topics in the >> cluster >> > by pattern. >> > >> > Note that client will only be forced to update metadata if the version >> in >> > the MetadataResponse is smaller than the version in the cached >> metadata. In >> > general it should not be a problem. It can be a problem only if some >> broker >> > is particularly slower than other brokers in processing >> > UpdateMetadataRequest. When this is the case, it means that the broker >> is >> > also particularly slower in processing LeaderAndIsrRequest, which can >> cause >> > problem anyway because some partition will probably have no leader >> during >> > this period. I am not sure problems (1) and (2) cause more problem than >> > what we already have. >> > >> > Thanks, >> > Dong >> > >> > >> > On Wed, Dec 6, 2017 at 6:42 PM, Jun Rao <j...@confluent.io> wrote: >> > >> > > Hi, Dong, >> > > >> > > Great finding on the issue. It's a real problem. A few comments about >> the >> > > KIP. (1) I am not sure about updating controller_metadata_epoch on >> every >> > > UpdateMetadataRequest. Currently, the controller can send >> > > UpdateMetadataRequest when there is no actual metadata change. Doing >> this >> > > may require unnecessary metadata refresh on the client. (2) >> > > controller_metadata_epoch is global across all topics. This means >> that a >> > > client may be forced to update its metadata even when the metadata for >> > the >> > > topics that it cares haven't changed. (3) It doesn't seem that the KIP >> > > handles the corner case when a consumer is restarted. Say a consumer >> > reads >> > > from the new leader, commits the offset and then is restarted. On >> > restart, >> > > the consumer gets an outdated metadata and fetches from the old >> leader. >> > > Then, the consumer will get into the offset out of range issue. >> > > >> > > Given the above, I am thinking of the following approach. We actually >> > > already have metadata versioning at the partition level. Each leader >> has >> > a >> > > leader epoch which is monotonically increasing. We can potentially >> > > propagate leader epoch back in the metadata response and the clients >> can >> > > cache that. This solves the issue of (1) and (2). To solve (3), when >> > saving >> > > an offset, we could save both an offset and the corresponding leader >> > epoch. >> > > When fetching the data, the consumer provides both the offset and the >> > > leader epoch. A leader will only serve the request if its leader >> epoch is >> > > equal to or greater than the leader epoch from the consumer. To >> achieve >> > > this, we need to change the fetch request protocol and the offset >> commit >> > > api, which requires some more thoughts. >> > > >> > > Thanks, >> > > >> > > Jun >> > > >> > > >> > > On Wed, Dec 6, 2017 at 10:57 AM, Dong Lin <lindon...@gmail.com> >> wrote: >> > > >> > > > Bump up the thread. >> > > > >> > > > It will be great to have more comments on whether we should do it or >> > > > whether there is better way to address the motivation of this KIP. >> > > > >> > > > On Mon, Dec 4, 2017 at 3:09 PM, Dong Lin <lindon...@gmail.com> >> wrote: >> > > > >> > > > > I don't have an interesting rejected alternative solution to put >> in >> > the >> > > > > KIP. If there is good alternative solution from anyone in this >> > thread, >> > > I >> > > > am >> > > > > happy to discuss this and update the KIP accordingly. >> > > > > >> > > > > Thanks, >> > > > > Dong >> > > > > >> > > > > On Mon, Dec 4, 2017 at 1:12 PM, Ted Yu <yuzhih...@gmail.com> >> wrote: >> > > > > >> > > > >> It is clearer now. >> > > > >> >> > > > >> I noticed that Rejected Alternatives section is empty. >> > > > >> Have you considered any alternative ? >> > > > >> >> > > > >> Cheers >> > > > >> >> > > > >> On Mon, Dec 4, 2017 at 1:07 PM, Dong Lin <lindon...@gmail.com> >> > wrote: >> > > > >> >> > > > >> > Ted, thanks for catching this. I have updated the sentence to >> make >> > > it >> > > > >> > readable. >> > > > >> > >> > > > >> > Thanks, >> > > > >> > Dong >> > > > >> > >> > > > >> > On Sat, Dec 2, 2017 at 3:05 PM, Ted Yu <yuzhih...@gmail.com> >> > wrote: >> > > > >> > >> > > > >> > > bq. It the controller_epoch of the incoming >> MetadataResponse, or >> > > if >> > > > >> the >> > > > >> > > controller_epoch is the same but the >> controller_metadata_epoch >> > > > >> > > >> > > > >> > > Can you update the above sentence so that the intention is >> > > clearer ? >> > > > >> > > >> > > > >> > > Thanks >> > > > >> > > >> > > > >> > > On Fri, Dec 1, 2017 at 6:33 PM, Dong Lin < >> lindon...@gmail.com> >> > > > wrote: >> > > > >> > > >> > > > >> > > > Hi all, >> > > > >> > > > >> > > > >> > > > I have created KIP-232: Detect outdated metadata by adding >> > > > >> > > > ControllerMetadataEpoch field: >> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> > > > >> > > > 232%3A+Detect+outdated+metadata+by+adding+ >> > > > >> > ControllerMetadataEpoch+field >> > > > >> > > > . >> > > > >> > > > >> > > > >> > > > The KIP proposes to add fields in MetadataResponse and >> > > > >> > > > UpdateMetadataRequest so that client can reject outdated >> > > metadata >> > > > >> and >> > > > >> > > avoid >> > > > >> > > > unnecessary OffsetOutOfRangeException. Otherwise there is >> > > > currently >> > > > >> > race >> > > > >> > > > condition that can cause consumer to reset offset which >> > > negatively >> > > > >> > affect >> > > > >> > > > the consumer's availability. >> > > > >> > > > >> > > > >> > > > Feedback and suggestions are welcome! >> > > > >> > > > >> > > > >> > > > Regards, >> > > > >> > > > Dong >> > > > >> > > > >> > > > >> > > >> > > > >> > >> > > > >> >> > > > > >> > > > > >> > > > >> > > >> > >> > >