> On May 21, 2015, 12:16 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 
> > 102-106
> > <https://reviews.apache.org/r/34450/diff/2/?file=965426#file965426line102>
> >
> >     Another way to do this is to only load from ZK on the becoming leader 
> > event for an offsetTopic partition. Then, we don't have to read from ZK 
> > during join group, which will introduce unnecessary overhead when joining a 
> > new group.
> 
> Guozhang Wang wrote:
>     I thought about this while working on the patch. The reason I feel it may 
> not worth doing the loading thing upon become-leader is that:
>     
>     1. When we are loading from ZK, we probably need to still reject any 
> join-group request which is not loaded yet, like what we did in offset 
> manager; this will introduce two more round trips (one for rediscover 
> coordinator and one for another join-group, unless we introduce a separate 
> "loading in progress" error code, then we can reduce it to one) compared with 
> loading from ZK on the fly, which is just one ZK read.
>     
>     2. It is likely that we only need to load from ZK once for each group, 
> upon the first join-group request received (when two join requests are 
> received at the same time we may need to unnecessarily read twice). And hence 
> the latency overhead is not much compared with loading-all-at-once. The only 
> concern is that it will slow down all handler threads a little bit when 
> coordinator migration happens instead of taking one thread for reading all 
> the ZK paths, which I feel is OK.

Yes, there are trade-offs btw preloading from ZK on coordinator change and lazy 
loading from ZK on HeartBeat.

For lazy reading from ZK, we need to worry about the following.
a. The group may be cached in ConsumerCoordinator, but it's outdated. This can 
happen when the coordinator is moved to another node and then is moved back 
(w/o node being started). So, we need a way to detect if the cache is outdated.
b. We need to worry about concurrent updates to the group and the ordering 
since the group cached in ConsumerCoordinator can now be updated directly and 
also by loading from ZK. For example, support one api thread is handling a HB 
request and it doesn't see the group in the cache and is about to load the 
cache from ZK. At the same time, a separate api thread is handling a new 
consumer join group request and it loads the group from ZK and updates the 
group with the new consumer. Now, the thread handling the HB can override the 
group after completing the loading from ZK.

For preloading from ZK,
1. It takes a bit of time to complete the loading from ZK. During this time, if 
we get a HB request, we can just return a CONSUMER_COORDINATOR_NOT_AVAILABLE 
error, which forces the client to redisover the coordinator. However, this is 
probably unlikely to happen. The consumer by default only sends a HB every 10 
secs. We can probably complete the loading from ZK in a couple of secs even 
with 1000 groups.


> On May 21, 2015, 12:16 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 
> > 369-381
> > <https://reviews.apache.org/r/34450/diff/2/?file=965426#file965426line369>
> >
> >     I was thinking whether it's worth including the leader epoch (of the 
> > corresponding offset topic partition) in the ZK value as we did for 
> > leaderAndIsr to prevent a zombie consumer coordinator from overwriting the 
> > value, during a soft failure. I am not sure if it's worth doing this 
> > immediately because
> >     
> >     1. When this happens, consumers can still recover after the heartbeat 
> > fails.
> >     2. It seems that doing this right is a bit more complicated. We need to 
> > keep the leader epoch in the ZK value. However, during a leader change, we 
> > probably need to update the values in ZK with the new leader epoch as well, 
> > in order to truely prevent the zombie coordinator from overwriting the 
> > value.
> >     
> >     So, I think for now, we can just use the simple approach in this patch.
> 
> Guozhang Wang wrote:
>     I think this is handled by the generation id, which is ever increasing, 
> and coordinator writing to ZK must have its generation id = ZK value + 1.
>     
>     One caveat though, is that when a group is empty we will remove it from 
> ZK and when it appears again we will take it as a new group with generation 
> id resetting to 1. Then a zombie coordinator happen to hold the "right" 
> generation id after resetting maybe able to override. For this case we can 
> create another JIRA.

The checking in the current patch may not be enough.
1. The checking and the update are not atomic. So, the generation id in ZK 
could change after it's read from ZK and the check is done.
2. The new coordinator may not do a rebalance for a long time. Therefore, the 
generation id in ZK is still the old one and the Zombie consumer can still 
override the value in ZK.


- Jun


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34450/#review84604
-----------------------------------------------------------


On May 22, 2015, 2:03 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34450/
> -----------------------------------------------------------
> 
> (Updated May 22, 2015, 2:03 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2017
>     https://issues.apache.org/jira/browse/KAFKA-2017
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> 1. Upon receiving join-group, if the group metadata cannot be found in the 
> local cache try to read it from ZK; 
> 2. Upon completing rebalance, update the ZK with new group registry or delete 
> the registry if the group becomes empty.
> 
> Address Jun's comments
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
> af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e 
>   core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 
> 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd 
>   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 
> c39e6de34ee531c6dfa9107b830752bd7f8fbe59 
>   core/src/main/scala/kafka/utils/ZkUtils.scala 
> 2618dd39b925b979ad6e4c0abd5c6eaafb3db5d5 
> 
> Diff: https://reviews.apache.org/r/34450/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>

Reply via email to