Hey Jun, Certainly. We can discuss later after KIP-320 settles.
Thanks! Dong On Wed, Jul 11, 2018 at 8:54 AM, Jun Rao <j...@confluent.io> wrote: > Hi, Dong, > > Sorry for the late response. Since KIP-320 is covering some of the similar > problems described in this KIP, perhaps we can wait until KIP-320 settles > and see what's still left uncovered in this KIP. > > Thanks, > > Jun > > On Mon, Jun 4, 2018 at 7:03 PM, Dong Lin <lindon...@gmail.com> wrote: > > > Hey Jun, > > > > It seems that we have made considerable progress on the discussion of > > KIP-253 since February. Do you think we should continue the discussion > > there, or can we continue the voting for this KIP? I am happy to submit > the > > PR and move forward the progress for this KIP. > > > > Thanks! > > Dong > > > > > > On Wed, Feb 7, 2018 at 11:42 PM, Dong Lin <lindon...@gmail.com> wrote: > > > > > Hey Jun, > > > > > > Sure, I will come up with a KIP this week. I think there is a way to > > allow > > > partition expansion to arbitrary number without introducing new > concepts > > > such as read-only partition or repartition epoch. > > > > > > Thanks, > > > Dong > > > > > > On Wed, Feb 7, 2018 at 5:28 PM, Jun Rao <j...@confluent.io> wrote: > > > > > >> Hi, Dong, > > >> > > >> Thanks for the reply. The general idea that you had for adding > > partitions > > >> is similar to what we had in mind. It would be useful to make this > more > > >> general, allowing adding an arbitrary number of partitions (instead of > > >> just > > >> doubling) and potentially removing partitions as well. The following > is > > >> the > > >> high level idea from the discussion with Colin, Jason and Ismael. > > >> > > >> * To change the number of partitions from X to Y in a topic, the > > >> controller > > >> marks all existing X partitions as read-only and creates Y new > > partitions. > > >> The new partitions are writable and are tagged with a higher > repartition > > >> epoch (RE). > > >> > > >> * The controller propagates the new metadata to every broker. Once the > > >> leader of a partition is marked as read-only, it rejects the produce > > >> requests on this partition. The producer will then refresh the > metadata > > >> and > > >> start publishing to the new writable partitions. > > >> > > >> * The consumers will then be consuming messages in RE order. The > > consumer > > >> coordinator will only assign partitions in the same RE to consumers. > > Only > > >> after all messages in an RE are consumed, will partitions in a higher > RE > > >> be > > >> assigned to consumers. > > >> > > >> As Colin mentioned, if we do the above, we could potentially (1) use a > > >> globally unique partition id, or (2) use a globally unique topic id to > > >> distinguish recreated partitions due to topic deletion. > > >> > > >> So, perhaps we can sketch out the re-partitioning KIP a bit more and > see > > >> if > > >> there is any overlap with KIP-232. Would you be interested in doing > > that? > > >> If not, we can do that next week. > > >> > > >> Jun > > >> > > >> > > >> On Tue, Feb 6, 2018 at 11:30 AM, Dong Lin <lindon...@gmail.com> > wrote: > > >> > > >> > Hey Jun, > > >> > > > >> > Interestingly I am also planning to sketch a KIP to allow partition > > >> > expansion for keyed topics after this KIP. Since you are already > doing > > >> > that, I guess I will just share my high level idea here in case it > is > > >> > helpful. > > >> > > > >> > The motivation for the KIP is that we currently lose order guarantee > > for > > >> > messages with the same key if we expand partitions of keyed topic. > > >> > > > >> > The solution can probably be built upon the following ideas: > > >> > > > >> > - Partition number of the keyed topic should always be doubled (or > > >> > multiplied by power of 2). Given that we select a partition based on > > >> > hash(key) % partitionNum, this should help us ensure that, a message > > >> > assigned to an existing partition will not be mapped to another > > existing > > >> > partition after partition expansion. > > >> > > > >> > - Producer includes in the ProduceRequest some information that > helps > > >> > ensure that messages produced ti a partition will monotonically > > >> increase in > > >> > the partitionNum of the topic. In other words, if broker receives a > > >> > ProduceRequest and notices that the producer does not know the > > partition > > >> > number has increased, broker should reject this request. That > > >> "information" > > >> > maybe leaderEpoch, max partitionEpoch of the partitions of the > topic, > > or > > >> > simply partitionNum of the topic. The benefit of this property is > that > > >> we > > >> > can keep the new logic for in-order message consumption entirely in > > how > > >> > consumer leader determines the partition -> consumer mapping. > > >> > > > >> > - When consumer leader determines partition -> consumer mapping, > > leader > > >> > first reads the start position for each partition using > > >> OffsetFetchRequest. > > >> > If start position are all non-zero, then assignment can be done in > its > > >> > current manner. The assumption is that, a message in the new > partition > > >> > should only be consumed after all messages with the same key > produced > > >> > before it has been consumed. Since some messages in the new > partition > > >> has > > >> > been consumed, we should not worry about consuming messages > > >> out-of-order. > > >> > This benefit of this approach is that we can avoid unnecessary > > overhead > > >> in > > >> > the common case. > > >> > > > >> > - If the consumer leader finds that the start position for some > > >> partition > > >> > is 0. Say the current partition number is 18 and the partition index > > is > > >> 12, > > >> > then consumer leader should ensure that messages produced to > partition > > >> 12 - > > >> > 18/2 = 3 before the first message of partition 12 is consumed, > before > > it > > >> > assigned partition 12 to any consumer in the consumer group. Since > we > > >> have > > >> > a "information" that is monotonically increasing per partition, > > consumer > > >> > can read the value of this information from the first message in > > >> partition > > >> > 12, get the offset corresponding to this value in partition 3, > assign > > >> > partition except for partition 12 (and probably other new > partitions) > > to > > >> > the existing consumers, waiting for the committed offset to go > beyond > > >> this > > >> > offset for partition 3, and trigger rebalance again so that > partition > > 3 > > >> can > > >> > be reassigned to some consumer. > > >> > > > >> > > > >> > Thanks, > > >> > Dong > > >> > > > >> > > > >> > On Tue, Feb 6, 2018 at 10:10 AM, Jun Rao <j...@confluent.io> wrote: > > >> > > > >> > > Hi, Dong, > > >> > > > > >> > > Thanks for the KIP. It looks good overall. We are working on a > > >> separate > > >> > KIP > > >> > > for adding partitions while preserving the ordering guarantees. > That > > >> may > > >> > > require another flavor of partition epoch. It's not very clear > > whether > > >> > that > > >> > > partition epoch can be merged with the partition epoch in this > KIP. > > >> So, > > >> > > perhaps you can wait on this a bit until we post the other KIP in > > the > > >> > next > > >> > > few days. > > >> > > > > >> > > Jun > > >> > > > > >> > > > > >> > > > > >> > > On Mon, Feb 5, 2018 at 2:43 PM, Becket Qin <becket....@gmail.com> > > >> wrote: > > >> > > > > >> > > > +1 on the KIP. > > >> > > > > > >> > > > I think the KIP is mainly about adding the capability of > tracking > > >> the > > >> > > > system state change lineage. It does not seem necessary to > bundle > > >> this > > >> > > KIP > > >> > > > with replacing the topic partition with partition epoch in > > >> > produce/fetch. > > >> > > > Replacing topic-partition string with partition epoch is > > >> essentially a > > >> > > > performance improvement on top of this KIP. That can probably be > > >> done > > >> > > > separately. > > >> > > > > > >> > > > Thanks, > > >> > > > > > >> > > > Jiangjie (Becket) Qin > > >> > > > > > >> > > > On Mon, Jan 29, 2018 at 11:52 AM, Dong Lin <lindon...@gmail.com > > > > >> > wrote: > > >> > > > > > >> > > > > Hey Colin, > > >> > > > > > > >> > > > > On Mon, Jan 29, 2018 at 11:23 AM, Colin McCabe < > > >> cmcc...@apache.org> > > >> > > > wrote: > > >> > > > > > > >> > > > > > > On Mon, Jan 29, 2018 at 10:35 AM, Dong Lin < > > >> lindon...@gmail.com> > > >> > > > > wrote: > > >> > > > > > > > > >> > > > > > > > Hey Colin, > > >> > > > > > > > > > >> > > > > > > > I understand that the KIP will adds overhead by > > introducing > > >> > > > > > per-partition > > >> > > > > > > > partitionEpoch. I am open to alternative solutions that > > does > > >> > not > > >> > > > > incur > > >> > > > > > > > additional overhead. But I don't see a better way now. > > >> > > > > > > > > > >> > > > > > > > IMO the overhead in the FetchResponse may not be that > > much. > > >> We > > >> > > > > probably > > >> > > > > > > > should discuss the percentage increase rather than the > > >> absolute > > >> > > > > number > > >> > > > > > > > increase. Currently after KIP-227, per-partition header > > has > > >> 23 > > >> > > > bytes. > > >> > > > > > This > > >> > > > > > > > KIP adds another 4 bytes. Assume the records size is > 10KB, > > >> the > > >> > > > > > percentage > > >> > > > > > > > increase is 4 / (23 + 10000) = 0.03%. It seems > negligible, > > >> > right? > > >> > > > > > > > >> > > > > > Hi Dong, > > >> > > > > > > > >> > > > > > Thanks for the response. I agree that the FetchRequest / > > >> > > FetchResponse > > >> > > > > > overhead should be OK, now that we have incremental fetch > > >> requests > > >> > > and > > >> > > > > > responses. However, there are a lot of cases where the > > >> percentage > > >> > > > > increase > > >> > > > > > is much greater. For example, if a client is doing full > > >> > > > > MetadataRequests / > > >> > > > > > Responses, we have some math kind of like this per > partition: > > >> > > > > > > > >> > > > > > > UpdateMetadataRequestPartitionState => topic partition > > >> > > > > controller_epoch > > >> > > > > > leader leader_epoch partition_epoch isr zk_version replicas > > >> > > > > > offline_replicas > > >> > > > > > > 14 bytes: topic => string (assuming about 10 byte topic > > >> names) > > >> > > > > > > 4 bytes: partition => int32 > > >> > > > > > > 4 bytes: conroller_epoch => int32 > > >> > > > > > > 4 bytes: leader => int32 > > >> > > > > > > 4 bytes: leader_epoch => int32 > > >> > > > > > > +4 EXTRA bytes: partition_epoch => int32 <-- NEW > > >> > > > > > > 2+4+4+4 bytes: isr => [int32] (assuming 3 in the ISR) > > >> > > > > > > 4 bytes: zk_version => int32 > > >> > > > > > > 2+4+4+4 bytes: replicas => [int32] (assuming 3 replicas) > > >> > > > > > > 2 offline_replicas => [int32] (assuming no offline > > replicas) > > >> > > > > > > > >> > > > > > Assuming I added that up correctly, the per-partition > overhead > > >> goes > > >> > > > from > > >> > > > > > 64 bytes per partition to 68, a 6.2% increase. > > >> > > > > > > > >> > > > > > We could do similar math for a lot of the other RPCs. And > you > > >> will > > >> > > > have > > >> > > > > a > > >> > > > > > similar memory and garbage collection impact on the brokers > > >> since > > >> > you > > >> > > > > have > > >> > > > > > to store all this extra state as well. > > >> > > > > > > > >> > > > > > > >> > > > > That is correct. IMO the Metadata is only updated periodically > > >> and is > > >> > > > > probably not a big deal if we increase it by 6%. The > > FetchResponse > > >> > and > > >> > > > > ProduceRequest are probably the only requests that are bounded > > by > > >> the > > >> > > > > bandwidth throughput. > > >> > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > > > > >> > > > > > > > I agree that we can probably save more space by using > > >> partition > > >> > > ID > > >> > > > so > > >> > > > > > that > > >> > > > > > > > we no longer needs the string topic name. The similar > idea > > >> has > > >> > > also > > >> > > > > > been > > >> > > > > > > > put in the Rejected Alternative section in KIP-227. > While > > >> this > > >> > > idea > > >> > > > > is > > >> > > > > > > > promising, it seems orthogonal to the goal of this KIP. > > >> Given > > >> > > that > > >> > > > > > there is > > >> > > > > > > > already many work to do in this KIP, maybe we can do the > > >> > > partition > > >> > > > ID > > >> > > > > > in a > > >> > > > > > > > separate KIP? > > >> > > > > > > > >> > > > > > I guess my thinking is that the goal here is to replace an > > >> > identifier > > >> > > > > > which can be re-used (the tuple of topic name, partition ID) > > >> with > > >> > an > > >> > > > > > identifier that cannot be re-used (the tuple of topic name, > > >> > partition > > >> > > > ID, > > >> > > > > > partition epoch) in order to gain better semantics. As long > > as > > >> we > > >> > > are > > >> > > > > > replacing the identifier, why not replace it with an > > identifier > > >> > that > > >> > > > has > > >> > > > > > important performance advantages? The KIP freeze for the > next > > >> > > release > > >> > > > > has > > >> > > > > > already passed, so there is time to do this. > > >> > > > > > > > >> > > > > > > >> > > > > In general it can be easier for discussion and implementation > if > > >> we > > >> > can > > >> > > > > split a larger task into smaller and independent tasks. For > > >> example, > > >> > > > > KIP-112 and KIP-113 both deals with the JBOD support. KIP-31, > > >> KIP-32 > > >> > > and > > >> > > > > KIP-33 are about timestamp support. The option on this can be > > >> subject > > >> > > > > though. > > >> > > > > > > >> > > > > IMO the change to switch from (topic, partition ID) to > > >> partitionEpch > > >> > in > > >> > > > all > > >> > > > > request/response requires us to going through all request one > by > > >> one. > > >> > > It > > >> > > > > may not be hard but it can be time consuming and tedious. At > > high > > >> > level > > >> > > > the > > >> > > > > goal and the change for that will be orthogonal to the changes > > >> > required > > >> > > > in > > >> > > > > this KIP. That is the main reason I think we can split them > into > > >> two > > >> > > > KIPs. > > >> > > > > > > >> > > > > > > >> > > > > > On Mon, Jan 29, 2018, at 10:54, Dong Lin wrote: > > >> > > > > > > I think it is possible to move to entirely use > > partitionEpoch > > >> > > instead > > >> > > > > of > > >> > > > > > > (topic, partition) to identify a partition. Client can > > obtain > > >> the > > >> > > > > > > partitionEpoch -> (topic, partition) mapping from > > >> > MetadataResponse. > > >> > > > We > > >> > > > > > > probably need to figure out a way to assign partitionEpoch > > to > > >> > > > existing > > >> > > > > > > partitions in the cluster. But this should be doable. > > >> > > > > > > > > >> > > > > > > This is a good idea. I think it will save us some space in > > the > > >> > > > > > > request/response. The actual space saving in percentage > > >> probably > > >> > > > > depends > > >> > > > > > on > > >> > > > > > > the amount of data and the number of partitions of the > same > > >> > topic. > > >> > > I > > >> > > > > just > > >> > > > > > > think we can do it in a separate KIP. > > >> > > > > > > > >> > > > > > Hmm. How much extra work would be required? It seems like > we > > >> are > > >> > > > > already > > >> > > > > > changing almost every RPC that involves topics and > partitions, > > >> > > already > > >> > > > > > adding new per-partition state to ZooKeeper, already > changing > > >> how > > >> > > > clients > > >> > > > > > interact with partitions. Is there some other big piece of > > work > > >> > we'd > > >> > > > > have > > >> > > > > > to do to move to partition IDs that we wouldn't need for > > >> partition > > >> > > > > epochs? > > >> > > > > > I guess we'd have to find a way to support regular > > >> expression-based > > >> > > > topic > > >> > > > > > subscriptions. If we split this into multiple KIPs, > wouldn't > > we > > >> > end > > >> > > up > > >> > > > > > changing all that RPCs and ZK state a second time? Also, > I'm > > >> > curious > > >> > > > if > > >> > > > > > anyone has done any proof of concept GC, memory, and network > > >> usage > > >> > > > > > measurements on switching topic names for topic IDs. > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > We will need to go over all requests/responses to check how to > > >> > replace > > >> > > > > (topic, partition ID) with partition epoch. It requires > > >> non-trivial > > >> > > work > > >> > > > > and could take time. As you mentioned, we may want to see how > > much > > >> > > saving > > >> > > > > we can get by switching from topic names to partition epoch. > > That > > >> > > itself > > >> > > > > requires time and experiment. It seems that the new idea does > > not > > >> > > > rollback > > >> > > > > any change proposed in this KIP. So I am not sure we can get > > much > > >> by > > >> > > > > putting them into the same KIP. > > >> > > > > > > >> > > > > Anyway, if more people are interested in seeing the new idea > in > > >> the > > >> > > same > > >> > > > > KIP, I can try that. > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > best, > > >> > > > > > Colin > > >> > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > On Mon, Jan 29, 2018 at 10:18 AM, Colin McCabe < > > >> > > cmcc...@apache.org > > >> > > > > > > >> > > > > > wrote: > > >> > > > > > > > > > >> > > > > > > >> On Fri, Jan 26, 2018, at 12:17, Dong Lin wrote: > > >> > > > > > > >> > Hey Colin, > > >> > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > >> > On Fri, Jan 26, 2018 at 10:16 AM, Colin McCabe < > > >> > > > > cmcc...@apache.org> > > >> > > > > > > >> wrote: > > >> > > > > > > >> > > > >> > > > > > > >> > > On Thu, Jan 25, 2018, at 16:47, Dong Lin wrote: > > >> > > > > > > >> > > > Hey Colin, > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > Thanks for the comment. > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > On Thu, Jan 25, 2018 at 4:15 PM, Colin McCabe < > > >> > > > > > cmcc...@apache.org> > > >> > > > > > > >> > > wrote: > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > On Wed, Jan 24, 2018, at 21:07, Dong Lin wrote: > > >> > > > > > > >> > > > > > Hey Colin, > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > Thanks for reviewing the KIP. > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > If I understand you right, you maybe > suggesting > > >> that > > >> > > we > > >> > > > > can > > >> > > > > > use > > >> > > > > > > >> a > > >> > > > > > > >> > > global > > >> > > > > > > >> > > > > > metadataEpoch that is incremented every time > > >> > > controller > > >> > > > > > updates > > >> > > > > > > >> > > metadata. > > >> > > > > > > >> > > > > > The problem with this solution is that, if a > > >> topic > > >> > is > > >> > > > > > deleted > > >> > > > > > > >> and > > >> > > > > > > >> > > created > > >> > > > > > > >> > > > > > again, user will not know whether that the > > offset > > >> > > which > > >> > > > is > > >> > > > > > > >> stored > > >> > > > > > > >> > > before > > >> > > > > > > >> > > > > > the topic deletion is no longer valid. This > > >> > motivates > > >> > > > the > > >> > > > > > idea > > >> > > > > > > >> to > > >> > > > > > > >> > > include > > >> > > > > > > >> > > > > > per-partition partitionEpoch. Does this sound > > >> > > > reasonable? > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > Hi Dong, > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > Perhaps we can store the last valid offset of > > each > > >> > > deleted > > >> > > > > > topic > > >> > > > > > > >> in > > >> > > > > > > >> > > > > ZooKeeper. Then, when a topic with one of > those > > >> names > > >> > > > gets > > >> > > > > > > >> > > re-created, we > > >> > > > > > > >> > > > > can start the topic at the previous end offset > > >> rather > > >> > > than > > >> > > > > at > > >> > > > > > 0. > > >> > > > > > > >> This > > >> > > > > > > >> > > > > preserves immutability. It is no more > burdensome > > >> than > > >> > > > > having > > >> > > > > > to > > >> > > > > > > >> > > preserve a > > >> > > > > > > >> > > > > "last epoch" for the deleted partition > somewhere, > > >> > right? > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > My concern with this solution is that the number > of > > >> > > > zookeeper > > >> > > > > > nodes > > >> > > > > > > >> get > > >> > > > > > > >> > > > more and more over time if some users keep > deleting > > >> and > > >> > > > > creating > > >> > > > > > > >> topics. > > >> > > > > > > >> > > Do > > >> > > > > > > >> > > > you think this can be a problem? > > >> > > > > > > >> > > > > >> > > > > > > >> > > Hi Dong, > > >> > > > > > > >> > > > > >> > > > > > > >> > > We could expire the "partition tombstones" after an > > >> hour > > >> > or > > >> > > > so. > > >> > > > > > In > > >> > > > > > > >> > > practice this would solve the issue for clients > that > > >> like > > >> > to > > >> > > > > > destroy > > >> > > > > > > >> and > > >> > > > > > > >> > > re-create topics all the time. In any case, > doesn't > > >> the > > >> > > > current > > >> > > > > > > >> proposal > > >> > > > > > > >> > > add per-partition znodes as well that we have to > > track > > >> > even > > >> > > > > after > > >> > > > > > the > > >> > > > > > > >> > > partition is deleted? Or did I misunderstand that? > > >> > > > > > > >> > > > > >> > > > > > > >> > > > >> > > > > > > >> > Actually the current KIP does not add per-partition > > >> znodes. > > >> > > > Could > > >> > > > > > you > > >> > > > > > > >> > double check? I can fix the KIP wiki if there is > > anything > > >> > > > > > misleading. > > >> > > > > > > >> > > >> > > > > > > >> Hi Dong, > > >> > > > > > > >> > > >> > > > > > > >> I double-checked the KIP, and I can see that you are in > > >> fact > > >> > > > using a > > >> > > > > > > >> global counter for initializing partition epochs. So, > > you > > >> are > > >> > > > > > correct, it > > >> > > > > > > >> doesn't add per-partition znodes for partitions that no > > >> longer > > >> > > > > exist. > > >> > > > > > > >> > > >> > > > > > > >> > > > >> > > > > > > >> > If we expire the "partition tomstones" after an hour, > > and > > >> > the > > >> > > > > topic > > >> > > > > > is > > >> > > > > > > >> > re-created after more than an hour since the topic > > >> deletion, > > >> > > > then > > >> > > > > > we are > > >> > > > > > > >> > back to the situation where user can not tell whether > > the > > >> > > topic > > >> > > > > has > > >> > > > > > been > > >> > > > > > > >> > re-created or not, right? > > >> > > > > > > >> > > >> > > > > > > >> Yes, with an expiration period, it would not ensure > > >> > > immutability-- > > >> > > > > you > > >> > > > > > > >> could effectively reuse partition names and they would > > look > > >> > the > > >> > > > > same. > > >> > > > > > > >> > > >> > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > >> > > > > >> > > > > > > >> > > It's not really clear to me what should happen > when a > > >> > topic > > >> > > is > > >> > > > > > > >> destroyed > > >> > > > > > > >> > > and re-created with new data. Should consumers > > >> continue > > >> > to > > >> > > be > > >> > > > > > able to > > >> > > > > > > >> > > consume? We don't know where they stopped > consuming > > >> from > > >> > > the > > >> > > > > > previous > > >> > > > > > > >> > > incarnation of the topic, so messages may have been > > >> lost. > > >> > > > > > Certainly > > >> > > > > > > >> > > consuming data from offset X of the new incarnation > > of > > >> the > > >> > > > topic > > >> > > > > > may > > >> > > > > > > >> give > > >> > > > > > > >> > > something totally different from what you would > have > > >> > gotten > > >> > > > from > > >> > > > > > > >> offset X > > >> > > > > > > >> > > of the previous incarnation of the topic. > > >> > > > > > > >> > > > > >> > > > > > > >> > > > >> > > > > > > >> > With the current KIP, if a consumer consumes a topic > > >> based > > >> > on > > >> > > > the > > >> > > > > > last > > >> > > > > > > >> > remembered (offset, partitionEpoch, leaderEpoch), and > > if > > >> the > > >> > > > topic > > >> > > > > > is > > >> > > > > > > >> > re-created, consume will throw > > >> > InvalidPartitionEpochException > > >> > > > > > because > > >> > > > > > > >> the > > >> > > > > > > >> > previous partitionEpoch will be different from the > > >> current > > >> > > > > > > >> partitionEpoch. > > >> > > > > > > >> > This is described in the Proposed Changes -> > > Consumption > > >> > after > > >> > > > > topic > > >> > > > > > > >> > deletion in the KIP. I can improve the KIP if there > is > > >> > > anything > > >> > > > > not > > >> > > > > > > >> clear. > > >> > > > > > > >> > > >> > > > > > > >> Thanks for the clarification. It sounds like what you > > >> really > > >> > > want > > >> > > > > is > > >> > > > > > > >> immutability-- i.e., to never "really" reuse partition > > >> > > > identifiers. > > >> > > > > > And > > >> > > > > > > >> you do this by making the partition name no longer the > > >> "real" > > >> > > > > > identifier. > > >> > > > > > > >> > > >> > > > > > > >> My big concern about this KIP is that it seems like an > > >> > > > > > anti-scalability > > >> > > > > > > >> feature. Now we are adding 4 extra bytes for every > > >> partition > > >> > in > > >> > > > the > > >> > > > > > > >> FetchResponse and Request, for example. That could be > 40 > > >> kb > > >> > per > > >> > > > > > request, > > >> > > > > > > >> if the user has 10,000 partitions. And of course, the > > KIP > > >> > also > > >> > > > > makes > > >> > > > > > > >> massive changes to UpdateMetadataRequest, > > MetadataResponse, > > >> > > > > > > >> OffsetCommitRequest, OffsetFetchResponse, > > >> LeaderAndIsrRequest, > > >> > > > > > > >> ListOffsetResponse, etc. which will also increase their > > >> size > > >> > on > > >> > > > the > > >> > > > > > wire > > >> > > > > > > >> and in memory. > > >> > > > > > > >> > > >> > > > > > > >> One thing that we talked a lot about in the past is > > >> replacing > > >> > > > > > partition > > >> > > > > > > >> names with IDs. IDs have a lot of really nice > features. > > >> They > > >> > > > take > > >> > > > > > up much > > >> > > > > > > >> less space in memory than strings (especially 2-byte > Java > > >> > > > strings). > > >> > > > > > They > > >> > > > > > > >> can often be allocated on the stack rather than the > heap > > >> > > > (important > > >> > > > > > when > > >> > > > > > > >> you are dealing with hundreds of thousands of them). > > They > > >> can > > >> > > be > > >> > > > > > > >> efficiently deserialized and serialized. If we use > > 64-bit > > >> > ones, > > >> > > > we > > >> > > > > > will > > >> > > > > > > >> never run out of IDs, which means that they can always > be > > >> > unique > > >> > > > per > > >> > > > > > > >> partition. > > >> > > > > > > >> > > >> > > > > > > >> Given that the partition name is no longer the "real" > > >> > identifier > > >> > > > for > > >> > > > > > > >> partitions in the current KIP-232 proposal, why not > just > > >> move > > >> > to > > >> > > > > using > > >> > > > > > > >> partition IDs entirely instead of strings? You have to > > >> change > > >> > > all > > >> > > > > the > > >> > > > > > > >> messages anyway. There isn't much point any more to > > >> carrying > > >> > > > around > > >> > > > > > the > > >> > > > > > > >> partition name in every RPC, since you really need > (name, > > >> > epoch) > > >> > > > to > > >> > > > > > > >> identify the partition. > > >> > > > > > > >> Probably the metadata response and a few other messages > > >> would > > >> > > have > > >> > > > > to > > >> > > > > > > >> still carry the partition name, to allow clients to go > > from > > >> > name > > >> > > > to > > >> > > > > > id. > > >> > > > > > > >> But we could mostly forget about the strings. And then > > >> this > > >> > > would > > >> > > > > be > > >> > > > > > a > > >> > > > > > > >> scalability improvement rather than a scalability > > problem. > > >> > > > > > > >> > > >> > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > >> > > By choosing to reuse the same (topic, partition, > > >> offset) > > >> > > > > 3-tuple, > > >> > > > > > we > > >> > > > > > > >> have > > >> > > > > > > >> > > > >> > > > > > > >> > chosen to give up immutability. That was a really > bad > > >> > > decision. > > >> > > > > > And > > >> > > > > > > >> now > > >> > > > > > > >> > > we have to worry about time dependencies, stale > > cached > > >> > data, > > >> > > > and > > >> > > > > > all > > >> > > > > > > >> the > > >> > > > > > > >> > > rest. We can't completely fix this inside Kafka no > > >> matter > > >> > > > what > > >> > > > > > we do, > > >> > > > > > > >> > > because not all that cached data is inside Kafka > > >> itself. > > >> > > Some > > >> > > > > of > > >> > > > > > it > > >> > > > > > > >> may be > > >> > > > > > > >> > > in systems that Kafka has sent data to, such as > other > > >> > > daemons, > > >> > > > > SQL > > >> > > > > > > >> > > databases, streams, and so forth. > > >> > > > > > > >> > > > > >> > > > > > > >> > > > >> > > > > > > >> > The current KIP will uniquely identify a message > using > > >> > (topic, > > >> > > > > > > >> partition, > > >> > > > > > > >> > offset, partitionEpoch) 4-tuple. This addresses the > > >> message > > >> > > > > > immutability > > >> > > > > > > >> > issue that you mentioned. Is there any corner case > > where > > >> the > > >> > > > > message > > >> > > > > > > >> > immutability is still not preserved with the current > > KIP? > > >> > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > >> > > > > >> > > > > > > >> > > I guess the idea here is that mirror maker should > > work > > >> as > > >> > > > > expected > > >> > > > > > > >> when > > >> > > > > > > >> > > users destroy a topic and re-create it with the > same > > >> name. > > >> > > > > That's > > >> > > > > > > >> kind of > > >> > > > > > > >> > > tough, though, since in that scenario, mirror maker > > >> > probably > > >> > > > > > should > > >> > > > > > > >> destroy > > >> > > > > > > >> > > and re-create the topic on the other end, too, > right? > > >> > > > > Otherwise, > > >> > > > > > > >> what you > > >> > > > > > > >> > > end up with on the other end could be half of one > > >> > > incarnation > > >> > > > of > > >> > > > > > the > > >> > > > > > > >> topic, > > >> > > > > > > >> > > and half of another. > > >> > > > > > > >> > > > > >> > > > > > > >> > > What mirror maker really needs is to be able to > > follow > > >> a > > >> > > > stream > > >> > > > > of > > >> > > > > > > >> events > > >> > > > > > > >> > > about the kafka cluster itself. We could have some > > >> master > > >> > > > topic > > >> > > > > > > >> which is > > >> > > > > > > >> > > always present and which contains data about all > > topic > > >> > > > > deletions, > > >> > > > > > > >> > > creations, etc. Then MM can simply follow this > topic > > >> and > > >> > do > > >> > > > > what > > >> > > > > > is > > >> > > > > > > >> needed. > > >> > > > > > > >> > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > Then the next question maybe, should we use a > > >> global > > >> > > > > > > >> metadataEpoch + > > >> > > > > > > >> > > > > > per-partition partitionEpoch, instead of > using > > >> > > > > per-partition > > >> > > > > > > >> > > leaderEpoch > > >> > > > > > > >> > > > > + > > >> > > > > > > >> > > > > > per-partition leaderEpoch. The former > solution > > >> using > > >> > > > > > > >> metadataEpoch > > >> > > > > > > >> > > would > > >> > > > > > > >> > > > > > not work due to the following scenario > > (provided > > >> by > > >> > > > Jun): > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > "Consider the following scenario. In metadata > > v1, > > >> > the > > >> > > > > leader > > >> > > > > > > >> for a > > >> > > > > > > >> > > > > > partition is at broker 1. In metadata v2, > > leader > > >> is > > >> > at > > >> > > > > > broker > > >> > > > > > > >> 2. In > > >> > > > > > > >> > > > > > metadata v3, leader is at broker 1 again. The > > >> last > > >> > > > > committed > > >> > > > > > > >> offset > > >> > > > > > > >> > > in > > >> > > > > > > >> > > > > v1, > > >> > > > > > > >> > > > > > v2 and v3 are 10, 20 and 30, respectively. A > > >> > consumer > > >> > > is > > >> > > > > > > >> started and > > >> > > > > > > >> > > > > reads > > >> > > > > > > >> > > > > > metadata v1 and reads messages from offset 0 > to > > >> 25 > > >> > > from > > >> > > > > > broker > > >> > > > > > > >> 1. My > > >> > > > > > > >> > > > > > understanding is that in the current > proposal, > > >> the > > >> > > > > metadata > > >> > > > > > > >> version > > >> > > > > > > >> > > > > > associated with offset 25 is v1. The consumer > > is > > >> > then > > >> > > > > > restarted > > >> > > > > > > >> and > > >> > > > > > > >> > > > > fetches > > >> > > > > > > >> > > > > > metadata v2. The consumer tries to read from > > >> broker > > >> > 2, > > >> > > > > > which is > > >> > > > > > > >> the > > >> > > > > > > >> > > old > > >> > > > > > > >> > > > > > leader with the last offset at 20. In this > > case, > > >> the > > >> > > > > > consumer > > >> > > > > > > >> will > > >> > > > > > > >> > > still > > >> > > > > > > >> > > > > > get OffsetOutOfRangeException incorrectly." > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > Regarding your comment "For the second > purpose, > > >> this > > >> > > is > > >> > > > > > "soft > > >> > > > > > > >> state" > > >> > > > > > > >> > > > > > anyway. If the client thinks X is the leader > > >> but Y > > >> > is > > >> > > > > > really > > >> > > > > > > >> the > > >> > > > > > > >> > > leader, > > >> > > > > > > >> > > > > > the client will talk to X, and X will point > out > > >> its > > >> > > > > mistake > > >> > > > > > by > > >> > > > > > > >> > > sending > > >> > > > > > > >> > > > > back > > >> > > > > > > >> > > > > > a NOT_LEADER_FOR_PARTITION.", it is probably > no > > >> > true. > > >> > > > The > > >> > > > > > > >> problem > > >> > > > > > > >> > > here is > > >> > > > > > > >> > > > > > that the old leader X may still think it is > the > > >> > leader > > >> > > > of > > >> > > > > > the > > >> > > > > > > >> > > partition > > >> > > > > > > >> > > > > and > > >> > > > > > > >> > > > > > thus it will not send back > > >> NOT_LEADER_FOR_PARTITION. > > >> > > The > > >> > > > > > reason > > >> > > > > > > >> is > > >> > > > > > > >> > > > > provided > > >> > > > > > > >> > > > > > in KAFKA-6262. Can you check if that makes > > sense? > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > This is solvable with a timeout, right? If the > > >> leader > > >> > > > can't > > >> > > > > > > >> > > communicate > > >> > > > > > > >> > > > > with the controller for a certain period of > time, > > >> it > > >> > > > should > > >> > > > > > stop > > >> > > > > > > >> > > acting as > > >> > > > > > > >> > > > > the leader. We have to solve this problem, > > >> anyway, in > > >> > > > order > > >> > > > > > to > > >> > > > > > > >> fix > > >> > > > > > > >> > > all the > > >> > > > > > > >> > > > > corner cases. > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > Not sure if I fully understand your proposal. The > > >> > proposal > > >> > > > > > seems to > > >> > > > > > > >> > > require > > >> > > > > > > >> > > > non-trivial changes to our existing leadership > > >> election > > >> > > > > > mechanism. > > >> > > > > > > >> Could > > >> > > > > > > >> > > > you provide more detail regarding how it works? > For > > >> > > example, > > >> > > > > how > > >> > > > > > > >> should > > >> > > > > > > >> > > > user choose this timeout, how leader determines > > >> whether > > >> > it > > >> > > > can > > >> > > > > > still > > >> > > > > > > >> > > > communicate with controller, and how this > triggers > > >> > > > controller > > >> > > > > to > > >> > > > > > > >> elect > > >> > > > > > > >> > > new > > >> > > > > > > >> > > > leader? > > >> > > > > > > >> > > > > >> > > > > > > >> > > Before I come up with any proposal, let me make > sure > > I > > >> > > > > understand > > >> > > > > > the > > >> > > > > > > >> > > problem correctly. My big question was, what > > prevents > > >> > > > > split-brain > > >> > > > > > > >> here? > > >> > > > > > > >> > > > > >> > > > > > > >> > > Let's say I have a partition which is on nodes A, > B, > > >> and > > >> > C, > > >> > > > with > > >> > > > > > > >> min-ISR > > >> > > > > > > >> > > 2. The controller is D. At some point, there is a > > >> > network > > >> > > > > > partition > > >> > > > > > > >> > > between A and B and the rest of the cluster. The > > >> > Controller > > >> > > > > > > >> re-assigns the > > >> > > > > > > >> > > partition to nodes C, D, and E. But A and B keep > > >> chugging > > >> > > > away, > > >> > > > > > even > > >> > > > > > > >> > > though they can no longer communicate with the > > >> controller. > > >> > > > > > > >> > > > > >> > > > > > > >> > > At some point, a client with stale metadata writes > to > > >> the > > >> > > > > > partition. > > >> > > > > > > >> It > > >> > > > > > > >> > > still thinks the partition is on node A, B, and C, > so > > >> > that's > > >> > > > > > where it > > >> > > > > > > >> sends > > >> > > > > > > >> > > the data. It's unable to talk to C, but A and B > > reply > > >> > back > > >> > > > that > > >> > > > > > all > > >> > > > > > > >> is > > >> > > > > > > >> > > well. > > >> > > > > > > >> > > > > >> > > > > > > >> > > Is this not a case where we could lose data due to > > >> split > > >> > > > brain? > > >> > > > > > Or is > > >> > > > > > > >> > > there a mechanism for preventing this that I > missed? > > >> If > > >> > it > > >> > > > is, > > >> > > > > it > > >> > > > > > > >> seems > > >> > > > > > > >> > > like a pretty serious failure case that we should > be > > >> > > handling > > >> > > > > > with our > > >> > > > > > > >> > > metadata rework. And I think epoch numbers and > > >> timeouts > > >> > > might > > >> > > > > be > > >> > > > > > > >> part of > > >> > > > > > > >> > > the solution. > > >> > > > > > > >> > > > > >> > > > > > > >> > > > >> > > > > > > >> > Right, split brain can happen if RF=4 and minIsr=2. > > >> > However, I > > >> > > > am > > >> > > > > > not > > >> > > > > > > >> sure > > >> > > > > > > >> > it is a pretty serious issue which we need to address > > >> today. > > >> > > > This > > >> > > > > > can be > > >> > > > > > > >> > prevented by configuring the Kafka topic so that > > minIsr > > > >> > > RF/2. > > >> > > > > > > >> Actually, > > >> > > > > > > >> > if user sets minIsr=2, is there anything reason that > > user > > >> > > wants > > >> > > > to > > >> > > > > > set > > >> > > > > > > >> RF=4 > > >> > > > > > > >> > instead of 4? > > >> > > > > > > >> > > > >> > > > > > > >> > Introducing timeout in leader election mechanism is > > >> > > > non-trivial. I > > >> > > > > > > >> think we > > >> > > > > > > >> > probably want to do that only if there is good > use-case > > >> that > > >> > > can > > >> > > > > not > > >> > > > > > > >> > otherwise be addressed with the current mechanism. > > >> > > > > > > >> > > >> > > > > > > >> I still would like to think about these corner cases > > more. > > >> > But > > >> > > > > > perhaps > > >> > > > > > > >> it's not directly related to this KIP. > > >> > > > > > > >> > > >> > > > > > > >> regards, > > >> > > > > > > >> Colin > > >> > > > > > > >> > > >> > > > > > > >> > > >> > > > > > > >> > > > >> > > > > > > >> > > > >> > > > > > > >> > > best, > > >> > > > > > > >> > > Colin > > >> > > > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > >> > > > > best, > > >> > > > > > > >> > > > > Colin > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > Regards, > > >> > > > > > > >> > > > > > Dong > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > On Wed, Jan 24, 2018 at 10:39 AM, Colin > McCabe > > < > > >> > > > > > > >> cmcc...@apache.org> > > >> > > > > > > >> > > > > wrote: > > >> > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > Hi Dong, > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > Thanks for proposing this KIP. I think a > > >> metadata > > >> > > > epoch > > >> > > > > > is a > > >> > > > > > > >> > > really > > >> > > > > > > >> > > > > good > > >> > > > > > > >> > > > > > > idea. > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > I read through the DISCUSS thread, but I > > still > > >> > don't > > >> > > > > have > > >> > > > > > a > > >> > > > > > > >> clear > > >> > > > > > > >> > > > > picture > > >> > > > > > > >> > > > > > > of why the proposal uses a metadata epoch > per > > >> > > > partition > > >> > > > > > rather > > >> > > > > > > >> > > than a > > >> > > > > > > >> > > > > > > global metadata epoch. A metadata epoch > per > > >> > > partition > > >> > > > > is > > >> > > > > > > >> kind of > > >> > > > > > > >> > > > > > > unpleasant-- it's at least 4 extra bytes > per > > >> > > partition > > >> > > > > > that we > > >> > > > > > > >> > > have to > > >> > > > > > > >> > > > > send > > >> > > > > > > >> > > > > > > over the wire in every full metadata > request, > > >> > which > > >> > > > > could > > >> > > > > > > >> become > > >> > > > > > > >> > > extra > > >> > > > > > > >> > > > > > > kilobytes on the wire when the number of > > >> > partitions > > >> > > > > > becomes > > >> > > > > > > >> large. > > >> > > > > > > >> > > > > Plus, > > >> > > > > > > >> > > > > > > we have to update all the auxillary classes > > to > > >> > > include > > >> > > > > an > > >> > > > > > > >> epoch. > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > We need to have a global metadata epoch > > anyway > > >> to > > >> > > > handle > > >> > > > > > > >> partition > > >> > > > > > > >> > > > > > > addition and deletion. For example, if I > > give > > >> you > > >> > > > > > > >> > > > > > > MetadataResponse{part1,epoch 1, part2, > epoch > > 1} > > >> > and > > >> > > > > > {part1, > > >> > > > > > > >> > > epoch1}, > > >> > > > > > > >> > > > > which > > >> > > > > > > >> > > > > > > MetadataResponse is newer? You have no way > > of > > >> > > > knowing. > > >> > > > > > It > > >> > > > > > > >> could > > >> > > > > > > >> > > be > > >> > > > > > > >> > > > > that > > >> > > > > > > >> > > > > > > part2 has just been created, and the > response > > >> > with 2 > > >> > > > > > > >> partitions is > > >> > > > > > > >> > > > > newer. > > >> > > > > > > >> > > > > > > Or it coudl be that part2 has just been > > >> deleted, > > >> > and > > >> > > > > > > >> therefore the > > >> > > > > > > >> > > > > response > > >> > > > > > > >> > > > > > > with 1 partition is newer. You must have a > > >> global > > >> > > > epoch > > >> > > > > > to > > >> > > > > > > >> > > > > disambiguate > > >> > > > > > > >> > > > > > > these two cases. > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > Previously, I worked on the Ceph > distributed > > >> > > > filesystem. > > >> > > > > > > >> Ceph had > > >> > > > > > > >> > > the > > >> > > > > > > >> > > > > > > concept of a map of the whole cluster, > > >> maintained > > >> > > by a > > >> > > > > few > > >> > > > > > > >> servers > > >> > > > > > > >> > > > > doing > > >> > > > > > > >> > > > > > > paxos. This map was versioned by a single > > >> 64-bit > > >> > > > epoch > > >> > > > > > number > > >> > > > > > > >> > > which > > >> > > > > > > >> > > > > > > increased on every change. It was > propagated > > >> to > > >> > > > clients > > >> > > > > > > >> through > > >> > > > > > > >> > > > > gossip. I > > >> > > > > > > >> > > > > > > wonder if something similar could work > here? > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > It seems like the the Kafka > MetadataResponse > > >> > serves > > >> > > > two > > >> > > > > > > >> somewhat > > >> > > > > > > >> > > > > unrelated > > >> > > > > > > >> > > > > > > purposes. Firstly, it lets clients know > what > > >> > > > partitions > > >> > > > > > > >> exist in > > >> > > > > > > >> > > the > > >> > > > > > > >> > > > > > > system and where they live. Secondly, it > > lets > > >> > > clients > > >> > > > > > know > > >> > > > > > > >> which > > >> > > > > > > >> > > nodes > > >> > > > > > > >> > > > > > > within the partition are in-sync (in the > ISR) > > >> and > > >> > > > which > > >> > > > > > node > > >> > > > > > > >> is the > > >> > > > > > > >> > > > > leader. > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > The first purpose is what you really need a > > >> > metadata > > >> > > > > epoch > > >> > > > > > > >> for, I > > >> > > > > > > >> > > > > think. > > >> > > > > > > >> > > > > > > You want to know whether a partition exists > > or > > >> > not, > > >> > > or > > >> > > > > you > > >> > > > > > > >> want to > > >> > > > > > > >> > > know > > >> > > > > > > >> > > > > > > which nodes you should talk to in order to > > >> write > > >> > to > > >> > > a > > >> > > > > > given > > >> > > > > > > >> > > > > partition. A > > >> > > > > > > >> > > > > > > single metadata epoch for the whole > response > > >> > should > > >> > > be > > >> > > > > > > >> adequate > > >> > > > > > > >> > > here. > > >> > > > > > > >> > > > > We > > >> > > > > > > >> > > > > > > should not change the partition assignment > > >> without > > >> > > > going > > >> > > > > > > >> through > > >> > > > > > > >> > > > > zookeeper > > >> > > > > > > >> > > > > > > (or a similar system), and this inherently > > >> > > serializes > > >> > > > > > updates > > >> > > > > > > >> into > > >> > > > > > > >> > > a > > >> > > > > > > >> > > > > > > numbered stream. Brokers should also stop > > >> > > responding > > >> > > > to > > >> > > > > > > >> requests > > >> > > > > > > >> > > when > > >> > > > > > > >> > > > > they > > >> > > > > > > >> > > > > > > are unable to contact ZK for a certain time > > >> > period. > > >> > > > > This > > >> > > > > > > >> prevents > > >> > > > > > > >> > > the > > >> > > > > > > >> > > > > case > > >> > > > > > > >> > > > > > > where a given partition has been moved off > > some > > >> > set > > >> > > of > > >> > > > > > nodes, > > >> > > > > > > >> but a > > >> > > > > > > >> > > > > client > > >> > > > > > > >> > > > > > > still ends up talking to those nodes and > > >> writing > > >> > > data > > >> > > > > > there. > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > For the second purpose, this is "soft > state" > > >> > anyway. > > >> > > > If > > >> > > > > > the > > >> > > > > > > >> client > > >> > > > > > > >> > > > > thinks > > >> > > > > > > >> > > > > > > X is the leader but Y is really the leader, > > the > > >> > > client > > >> > > > > > will > > >> > > > > > > >> talk > > >> > > > > > > >> > > to X, > > >> > > > > > > >> > > > > and > > >> > > > > > > >> > > > > > > X will point out its mistake by sending > back > > a > > >> > > > > > > >> > > > > NOT_LEADER_FOR_PARTITION. > > >> > > > > > > >> > > > > > > Then the client can update its metadata > again > > >> and > > >> > > find > > >> > > > > > the new > > >> > > > > > > >> > > leader, > > >> > > > > > > >> > > > > if > > >> > > > > > > >> > > > > > > there is one. There is no need for an > epoch > > to > > >> > > handle > > >> > > > > > this. > > >> > > > > > > >> > > > > Similarly, I > > >> > > > > > > >> > > > > > > can't think of a reason why changing the > > >> in-sync > > >> > > > replica > > >> > > > > > set > > >> > > > > > > >> needs > > >> > > > > > > >> > > to > > >> > > > > > > >> > > > > bump > > >> > > > > > > >> > > > > > > the epoch. > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > best, > > >> > > > > > > >> > > > > > > Colin > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > On Wed, Jan 24, 2018, at 09:45, Dong Lin > > wrote: > > >> > > > > > > >> > > > > > > > Thanks much for reviewing the KIP! > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > Dong > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > On Wed, Jan 24, 2018 at 7:10 AM, Guozhang > > >> Wang < > > >> > > > > > > >> > > wangg...@gmail.com> > > >> > > > > > > >> > > > > > > wrote: > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > Yeah that makes sense, again I'm just > > >> making > > >> > > sure > > >> > > > we > > >> > > > > > > >> understand > > >> > > > > > > >> > > > > all the > > >> > > > > > > >> > > > > > > > > scenarios and what to expect. > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > I agree that if, more generally > speaking, > > >> say > > >> > > > users > > >> > > > > > have > > >> > > > > > > >> only > > >> > > > > > > >> > > > > consumed > > >> > > > > > > >> > > > > > > to > > >> > > > > > > >> > > > > > > > > offset 8, and then call seek(16) to > > "jump" > > >> to > > >> > a > > >> > > > > > further > > >> > > > > > > >> > > position, > > >> > > > > > > >> > > > > then > > >> > > > > > > >> > > > > > > she > > >> > > > > > > >> > > > > > > > > needs to be aware that OORE maybe > thrown > > >> and > > >> > she > > >> > > > > > needs to > > >> > > > > > > >> > > handle > > >> > > > > > > >> > > > > it or > > >> > > > > > > >> > > > > > > rely > > >> > > > > > > >> > > > > > > > > on reset policy which should not > surprise > > >> her. > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > I'm +1 on the KIP. > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > Guozhang > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > On Wed, Jan 24, 2018 at 12:31 AM, Dong > > Lin > > >> < > > >> > > > > > > >> > > lindon...@gmail.com> > > >> > > > > > > >> > > > > > > wrote: > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > Yes, in general we can not prevent > > >> > > > > > > >> OffsetOutOfRangeException > > >> > > > > > > >> > > if > > >> > > > > > > >> > > > > user > > >> > > > > > > >> > > > > > > > > seeks > > >> > > > > > > >> > > > > > > > > > to a wrong offset. The main goal is > to > > >> > prevent > > >> > > > > > > >> > > > > > > OffsetOutOfRangeException > > >> > > > > > > >> > > > > > > > > if > > >> > > > > > > >> > > > > > > > > > user has done things in the right > way, > > >> e.g. > > >> > > user > > >> > > > > > should > > >> > > > > > > >> know > > >> > > > > > > >> > > that > > >> > > > > > > >> > > > > > > there > > >> > > > > > > >> > > > > > > > > is > > >> > > > > > > >> > > > > > > > > > message with this offset. > > >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > For example, if user calls seek(..) > > right > > >> > > after > > >> > > > > > > >> > > construction, the > > >> > > > > > > >> > > > > > > only > > >> > > > > > > >> > > > > > > > > > reason I can think of is that user > > stores > > >> > > offset > > >> > > > > > > >> externally. > > >> > > > > > > >> > > In > > >> > > > > > > >> > > > > this > > >> > > > > > > >> > > > > > > > > case, > > >> > > > > > > >> > > > > > > > > > user currently needs to use the > offset > > >> which > > >> > > is > > >> > > > > > obtained > > >> > > > > > > >> > > using > > >> > > > > > > >> > > > > > > > > position(..) > > >> > > > > > > >> > > > > > > > > > from the last run. With this KIP, > user > > >> needs > > >> > > to > > >> > > > > get > > >> > > > > > the > > >> > > > > > > >> > > offset > > >> > > > > > > >> > > > > and > > >> > > > > > > >> > > > > > > the > > >> > > > > > > >> > > > > > > > > > offsetEpoch using > > >> > positionAndOffsetEpoch(...) > > >> > > > and > > >> > > > > > stores > > >> > > > > > > >> > > these > > >> > > > > > > >> > > > > > > > > information > > >> > > > > > > >> > > > > > > > > > externally. The next time user starts > > >> > > consumer, > > >> > > > > > he/she > > >> > > > > > > >> needs > > >> > > > > > > >> > > to > > >> > > > > > > >> > > > > call > > >> > > > > > > >> > > > > > > > > > seek(..., offset, offsetEpoch) right > > >> after > > >> > > > > > construction. > > >> > > > > > > >> > > Then KIP > > >> > > > > > > >> > > > > > > should > > >> > > > > > > >> > > > > > > > > be > > >> > > > > > > >> > > > > > > > > > able to ensure that we don't throw > > >> > > > > > > >> OffsetOutOfRangeException > > >> > > > > > > >> > > if > > >> > > > > > > >> > > > > > > there is > > >> > > > > > > >> > > > > > > > > no > > >> > > > > > > >> > > > > > > > > > unclean leader election. > > >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > Does this sound OK? > > >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > Regards, > > >> > > > > > > >> > > > > > > > > > Dong > > >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > On Tue, Jan 23, 2018 at 11:44 PM, > > >> Guozhang > > >> > > Wang > > >> > > > < > > >> > > > > > > >> > > > > wangg...@gmail.com> > > >> > > > > > > >> > > > > > > > > > wrote: > > >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > "If consumer wants to consume > message > > >> with > > >> > > > > offset > > >> > > > > > 16, > > >> > > > > > > >> then > > >> > > > > > > >> > > > > consumer > > >> > > > > > > >> > > > > > > > > must > > >> > > > > > > >> > > > > > > > > > > have > > >> > > > > > > >> > > > > > > > > > > already fetched message with offset > > 15" > > >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > --> this may not be always true > > right? > > >> > What > > >> > > if > > >> > > > > > > >> consumer > > >> > > > > > > >> > > just > > >> > > > > > > >> > > > > call > > >> > > > > > > >> > > > > > > > > > seek(16) > > >> > > > > > > >> > > > > > > > > > > after construction and then poll > > >> without > > >> > > > > committed > > >> > > > > > > >> offset > > >> > > > > > > >> > > ever > > >> > > > > > > >> > > > > > > stored > > >> > > > > > > >> > > > > > > > > > > before? Admittedly it is rare but > we > > do > > >> > not > > >> > > > > > > >> programmably > > >> > > > > > > >> > > > > disallow > > >> > > > > > > >> > > > > > > it. > > >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > Guozhang > > >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > On Tue, Jan 23, 2018 at 10:42 PM, > > Dong > > >> > Lin < > > >> > > > > > > >> > > > > lindon...@gmail.com> > > >> > > > > > > >> > > > > > > > > wrote: > > >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > Hey Guozhang, > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > Thanks much for reviewing the > KIP! > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > In the scenario you described, > > let's > > >> > > assume > > >> > > > > that > > >> > > > > > > >> broker > > >> > > > > > > >> > > A has > > >> > > > > > > >> > > > > > > > > messages > > >> > > > > > > >> > > > > > > > > > > with > > >> > > > > > > >> > > > > > > > > > > > offset up to 10, and broker B has > > >> > messages > > >> > > > > with > > >> > > > > > > >> offset > > >> > > > > > > >> > > up to > > >> > > > > > > >> > > > > 20. > > >> > > > > > > >> > > > > > > If > > >> > > > > > > >> > > > > > > > > > > > consumer wants to consume message > > >> with > > >> > > > offset > > >> > > > > > 9, it > > >> > > > > > > >> will > > >> > > > > > > >> > > not > > >> > > > > > > >> > > > > > > receive > > >> > > > > > > >> > > > > > > > > > > > OffsetOutOfRangeException > > >> > > > > > > >> > > > > > > > > > > > from broker A. > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > If consumer wants to consume > > message > > >> > with > > >> > > > > offset > > >> > > > > > > >> 16, then > > >> > > > > > > >> > > > > > > consumer > > >> > > > > > > >> > > > > > > > > must > > >> > > > > > > >> > > > > > > > > > > > have already fetched message with > > >> offset > > >> > > 15, > > >> > > > > > which > > >> > > > > > > >> can > > >> > > > > > > >> > > only > > >> > > > > > > >> > > > > come > > >> > > > > > > >> > > > > > > from > > >> > > > > > > >> > > > > > > > > > > > broker B. Because consumer will > > fetch > > >> > from > > >> > > > > > broker B > > >> > > > > > > >> only > > >> > > > > > > >> > > if > > >> > > > > > > >> > > > > > > > > leaderEpoch > > >> > > > > > > >> > > > > > > > > > > >= > > >> > > > > > > >> > > > > > > > > > > > 2, then the current consumer > > >> leaderEpoch > > >> > > can > > >> > > > > > not be > > >> > > > > > > >> 1 > > >> > > > > > > >> > > since > > >> > > > > > > >> > > > > this > > >> > > > > > > >> > > > > > > KIP > > >> > > > > > > >> > > > > > > > > > > > prevents leaderEpoch rewind. Thus > > we > > >> > will > > >> > > > not > > >> > > > > > have > > >> > > > > > > >> > > > > > > > > > > > OffsetOutOfRangeException > > >> > > > > > > >> > > > > > > > > > > > in this case. > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > Does this address your question, > or > > >> > maybe > > >> > > > > there > > >> > > > > > is > > >> > > > > > > >> more > > >> > > > > > > >> > > > > advanced > > >> > > > > > > >> > > > > > > > > > scenario > > >> > > > > > > >> > > > > > > > > > > > that the KIP does not handle? > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > Thanks, > > >> > > > > > > >> > > > > > > > > > > > Dong > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > On Tue, Jan 23, 2018 at 9:43 PM, > > >> > Guozhang > > >> > > > > Wang < > > >> > > > > > > >> > > > > > > wangg...@gmail.com> > > >> > > > > > > >> > > > > > > > > > > wrote: > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > Thanks Dong, I made a pass over > > the > > >> > wiki > > >> > > > and > > >> > > > > > it > > >> > > > > > > >> lgtm. > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > Just a quick question: can we > > >> > completely > > >> > > > > > > >> eliminate the > > >> > > > > > > >> > > > > > > > > > > > > OffsetOutOfRangeException with > > this > > >> > > > > approach? > > >> > > > > > Say > > >> > > > > > > >> if > > >> > > > > > > >> > > there > > >> > > > > > > >> > > > > is > > >> > > > > > > >> > > > > > > > > > > consecutive > > >> > > > > > > >> > > > > > > > > > > > > leader changes such that the > > cached > > >> > > > > metadata's > > >> > > > > > > >> > > partition > > >> > > > > > > >> > > > > epoch > > >> > > > > > > >> > > > > > > is > > >> > > > > > > >> > > > > > > > > 1, > > >> > > > > > > >> > > > > > > > > > > and > > >> > > > > > > >> > > > > > > > > > > > > the metadata fetch response > > returns > > >> > > with > > >> > > > > > > >> partition > > >> > > > > > > >> > > epoch 2 > > >> > > > > > > >> > > > > > > > > pointing > > >> > > > > > > >> > > > > > > > > > to > > >> > > > > > > >> > > > > > > > > > > > > leader broker A, while the > actual > > >> > > > up-to-date > > >> > > > > > > >> metadata > > >> > > > > > > >> > > has > > >> > > > > > > >> > > > > > > partition > > >> > > > > > > >> > > > > > > > > > > > epoch 3 > > >> > > > > > > >> > > > > > > > > > > > > whose leader is now broker B, > the > > >> > > metadata > > >> > > > > > > >> refresh will > > >> > > > > > > >> > > > > still > > >> > > > > > > >> > > > > > > > > succeed > > >> > > > > > > >> > > > > > > > > > > and > > >> > > > > > > >> > > > > > > > > > > > > the follow-up fetch request may > > >> still > > >> > > see > > >> > > > > > OORE? > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > Guozhang > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > On Tue, Jan 23, 2018 at 3:47 > PM, > > >> Dong > > >> > > Lin > > >> > > > < > > >> > > > > > > >> > > > > lindon...@gmail.com > > >> > > > > > > >> > > > > > > > > > >> > > > > > > >> > > > > > > > > > wrote: > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > Hi all, > > >> > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > I would like to start the > > voting > > >> > > process > > >> > > > > for > > >> > > > > > > >> KIP-232: > > >> > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > https://cwiki.apache.org/ > > >> > > > > > > >> > > confluence/display/KAFKA/KIP- > > >> > > > > > > >> > > > > > > > > > > > > > > 232%3A+Detect+outdated+metadat > > >> > > > > > > >> a+using+leaderEpoch+ > > >> > > > > > > >> > > > > > > > > > and+partitionEpoch > > >> > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > The KIP will help fix a > > >> concurrency > > >> > > > issue > > >> > > > > in > > >> > > > > > > >> Kafka > > >> > > > > > > >> > > which > > >> > > > > > > >> > > > > > > > > currently > > >> > > > > > > >> > > > > > > > > > > can > > >> > > > > > > >> > > > > > > > > > > > > > cause message loss or message > > >> > > > duplication > > >> > > > > in > > >> > > > > > > >> > > consumer. > > >> > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > Regards, > > >> > > > > > > >> > > > > > > > > > > > > > Dong > > >> > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > -- > > >> > > > > > > >> > > > > > > > > > > > > -- Guozhang > > >> > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > -- > > >> > > > > > > >> > > > > > > > > > > -- Guozhang > > >> > > > > > > >> > > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > -- > > >> > > > > > > >> > > > > > > > > -- Guozhang > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > >> > > > > > > >> > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > > > > >