On Wed, Jan 24, 2018, at 21:07, Dong Lin wrote: > Hey Colin, > > Thanks for reviewing the KIP. > > If I understand you right, you maybe suggesting that we can use a global > metadataEpoch that is incremented every time controller updates metadata. > The problem with this solution is that, if a topic is deleted and created > again, user will not know whether that the offset which is stored before > the topic deletion is no longer valid. This motivates the idea to include > per-partition partitionEpoch. Does this sound reasonable?
Hi Dong, Perhaps we can store the last valid offset of each deleted topic in ZooKeeper. Then, when a topic with one of those names gets re-created, we can start the topic at the previous end offset rather than at 0. This preserves immutability. It is no more burdensome than having to preserve a "last epoch" for the deleted partition somewhere, right? > > Then the next question maybe, should we use a global metadataEpoch + > per-partition partitionEpoch, instead of using per-partition leaderEpoch + > per-partition leaderEpoch. The former solution using metadataEpoch would > not work due to the following scenario (provided by Jun): > > "Consider the following scenario. In metadata v1, the leader for a > partition is at broker 1. In metadata v2, leader is at broker 2. In > metadata v3, leader is at broker 1 again. The last committed offset in v1, > v2 and v3 are 10, 20 and 30, respectively. A consumer is started and reads > metadata v1 and reads messages from offset 0 to 25 from broker 1. My > understanding is that in the current proposal, the metadata version > associated with offset 25 is v1. The consumer is then restarted and fetches > metadata v2. The consumer tries to read from broker 2, which is the old > leader with the last offset at 20. In this case, the consumer will still > get OffsetOutOfRangeException incorrectly." > > Regarding your comment "For the second purpose, this is "soft state" > anyway. If the client thinks X is the leader but Y is really the leader, > the client will talk to X, and X will point out its mistake by sending back > a NOT_LEADER_FOR_PARTITION.", it is probably no true. The problem here is > that the old leader X may still think it is the leader of the partition and > thus it will not send back NOT_LEADER_FOR_PARTITION. The reason is provided > in KAFKA-6262. Can you check if that makes sense? This is solvable with a timeout, right? If the leader can't communicate with the controller for a certain period of time, it should stop acting as the leader. We have to solve this problem, anyway, in order to fix all the corner cases. best, Colin > > Regards, > Dong > > > On Wed, Jan 24, 2018 at 10:39 AM, Colin McCabe <cmcc...@apache.org> wrote: > > > Hi Dong, > > > > Thanks for proposing this KIP. I think a metadata epoch is a really good > > idea. > > > > I read through the DISCUSS thread, but I still don't have a clear picture > > of why the proposal uses a metadata epoch per partition rather than a > > global metadata epoch. A metadata epoch per partition is kind of > > unpleasant-- it's at least 4 extra bytes per partition that we have to send > > over the wire in every full metadata request, which could become extra > > kilobytes on the wire when the number of partitions becomes large. Plus, > > we have to update all the auxillary classes to include an epoch. > > > > We need to have a global metadata epoch anyway to handle partition > > addition and deletion. For example, if I give you > > MetadataResponse{part1,epoch 1, part2, epoch 1} and {part1, epoch1}, which > > MetadataResponse is newer? You have no way of knowing. It could be that > > part2 has just been created, and the response with 2 partitions is newer. > > Or it coudl be that part2 has just been deleted, and therefore the response > > with 1 partition is newer. You must have a global epoch to disambiguate > > these two cases. > > > > Previously, I worked on the Ceph distributed filesystem. Ceph had the > > concept of a map of the whole cluster, maintained by a few servers doing > > paxos. This map was versioned by a single 64-bit epoch number which > > increased on every change. It was propagated to clients through gossip. I > > wonder if something similar could work here? > > > > It seems like the the Kafka MetadataResponse serves two somewhat unrelated > > purposes. Firstly, it lets clients know what partitions exist in the > > system and where they live. Secondly, it lets clients know which nodes > > within the partition are in-sync (in the ISR) and which node is the leader. > > > > The first purpose is what you really need a metadata epoch for, I think. > > You want to know whether a partition exists or not, or you want to know > > which nodes you should talk to in order to write to a given partition. A > > single metadata epoch for the whole response should be adequate here. We > > should not change the partition assignment without going through zookeeper > > (or a similar system), and this inherently serializes updates into a > > numbered stream. Brokers should also stop responding to requests when they > > are unable to contact ZK for a certain time period. This prevents the case > > where a given partition has been moved off some set of nodes, but a client > > still ends up talking to those nodes and writing data there. > > > > For the second purpose, this is "soft state" anyway. If the client thinks > > X is the leader but Y is really the leader, the client will talk to X, and > > X will point out its mistake by sending back a NOT_LEADER_FOR_PARTITION. > > Then the client can update its metadata again and find the new leader, if > > there is one. There is no need for an epoch to handle this. Similarly, I > > can't think of a reason why changing the in-sync replica set needs to bump > > the epoch. > > > > best, > > Colin > > > > > > On Wed, Jan 24, 2018, at 09:45, Dong Lin wrote: > > > Thanks much for reviewing the KIP! > > > > > > Dong > > > > > > On Wed, Jan 24, 2018 at 7:10 AM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > Yeah that makes sense, again I'm just making sure we understand all the > > > > scenarios and what to expect. > > > > > > > > I agree that if, more generally speaking, say users have only consumed > > to > > > > offset 8, and then call seek(16) to "jump" to a further position, then > > she > > > > needs to be aware that OORE maybe thrown and she needs to handle it or > > rely > > > > on reset policy which should not surprise her. > > > > > > > > > > > > I'm +1 on the KIP. > > > > > > > > Guozhang > > > > > > > > > > > > On Wed, Jan 24, 2018 at 12:31 AM, Dong Lin <lindon...@gmail.com> > > wrote: > > > > > > > > > Yes, in general we can not prevent OffsetOutOfRangeException if user > > > > seeks > > > > > to a wrong offset. The main goal is to prevent > > OffsetOutOfRangeException > > > > if > > > > > user has done things in the right way, e.g. user should know that > > there > > > > is > > > > > message with this offset. > > > > > > > > > > For example, if user calls seek(..) right after construction, the > > only > > > > > reason I can think of is that user stores offset externally. In this > > > > case, > > > > > user currently needs to use the offset which is obtained using > > > > position(..) > > > > > from the last run. With this KIP, user needs to get the offset and > > the > > > > > offsetEpoch using positionAndOffsetEpoch(...) and stores these > > > > information > > > > > externally. The next time user starts consumer, he/she needs to call > > > > > seek(..., offset, offsetEpoch) right after construction. Then KIP > > should > > > > be > > > > > able to ensure that we don't throw OffsetOutOfRangeException if > > there is > > > > no > > > > > unclean leader election. > > > > > > > > > > Does this sound OK? > > > > > > > > > > Regards, > > > > > Dong > > > > > > > > > > > > > > > On Tue, Jan 23, 2018 at 11:44 PM, Guozhang Wang <wangg...@gmail.com> > > > > > wrote: > > > > > > > > > > > "If consumer wants to consume message with offset 16, then consumer > > > > must > > > > > > have > > > > > > already fetched message with offset 15" > > > > > > > > > > > > --> this may not be always true right? What if consumer just call > > > > > seek(16) > > > > > > after construction and then poll without committed offset ever > > stored > > > > > > before? Admittedly it is rare but we do not programmably disallow > > it. > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Tue, Jan 23, 2018 at 10:42 PM, Dong Lin <lindon...@gmail.com> > > > > wrote: > > > > > > > > > > > > > Hey Guozhang, > > > > > > > > > > > > > > Thanks much for reviewing the KIP! > > > > > > > > > > > > > > In the scenario you described, let's assume that broker A has > > > > messages > > > > > > with > > > > > > > offset up to 10, and broker B has messages with offset up to 20. > > If > > > > > > > consumer wants to consume message with offset 9, it will not > > receive > > > > > > > OffsetOutOfRangeException > > > > > > > from broker A. > > > > > > > > > > > > > > If consumer wants to consume message with offset 16, then > > consumer > > > > must > > > > > > > have already fetched message with offset 15, which can only come > > from > > > > > > > broker B. Because consumer will fetch from broker B only if > > > > leaderEpoch > > > > > > >= > > > > > > > 2, then the current consumer leaderEpoch can not be 1 since this > > KIP > > > > > > > prevents leaderEpoch rewind. Thus we will not have > > > > > > > OffsetOutOfRangeException > > > > > > > in this case. > > > > > > > > > > > > > > Does this address your question, or maybe there is more advanced > > > > > scenario > > > > > > > that the KIP does not handle? > > > > > > > > > > > > > > Thanks, > > > > > > > Dong > > > > > > > > > > > > > > On Tue, Jan 23, 2018 at 9:43 PM, Guozhang Wang < > > wangg...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > Thanks Dong, I made a pass over the wiki and it lgtm. > > > > > > > > > > > > > > > > Just a quick question: can we completely eliminate the > > > > > > > > OffsetOutOfRangeException with this approach? Say if there is > > > > > > consecutive > > > > > > > > leader changes such that the cached metadata's partition epoch > > is > > > > 1, > > > > > > and > > > > > > > > the metadata fetch response returns with partition epoch 2 > > > > pointing > > > > > to > > > > > > > > leader broker A, while the actual up-to-date metadata has > > partition > > > > > > > epoch 3 > > > > > > > > whose leader is now broker B, the metadata refresh will still > > > > succeed > > > > > > and > > > > > > > > the follow-up fetch request may still see OORE? > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jan 23, 2018 at 3:47 PM, Dong Lin <lindon...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > I would like to start the voting process for KIP-232: > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > > > > > 232%3A+Detect+outdated+metadata+using+leaderEpoch+ > > > > > and+partitionEpoch > > > > > > > > > > > > > > > > > > The KIP will help fix a concurrency issue in Kafka which > > > > currently > > > > > > can > > > > > > > > > cause message loss or message duplication in consumer. > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > >