[ https://issues.apache.org/jira/browse/KAFKA-7142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang resolved KAFKA-7142. ---------------------------------- Resolution: Fixed Fix Version/s: 2.1.0 Resolving the ticket since the PR is merged as in 2.1.0 already. > Rebalancing large consumer group can block the coordinator broker for several > seconds > ------------------------------------------------------------------------------------- > > Key: KAFKA-7142 > URL: https://issues.apache.org/jira/browse/KAFKA-7142 > Project: Kafka > Issue Type: Improvement > Affects Versions: 0.9.0.0, 0.10.0.0, 0.10.1.0, 0.10.2.0, 0.11.0.0, 1.0.0, > 1.1.0 > Reporter: Ying Zheng > Assignee: Ying Zheng > Priority: Major > Fix For: 2.1.0 > > > In our production cluster, we noticed that when a large consumer group (a few > thousand members) is rebalancing, the produce latency of the coordinator > broker can jump to several seconds. > > Group rebalance is a very frequent operation, it can be triggered by adding / > removing / restarting a single member in the consumer group. > > When this happens, jstack shows all the request handler threads of the broker > are waiting for group lock: > {noformat} > "kafka-request-handler-7" #87 daemon prio=5 os_prio=0 tid=0x00007f9a32b16000 > nid=0x1b985 waiting on condition [0x00007f98f1adb000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x000000024aa73b20> (a > java.util.concurrent.locks.ReentrantLock$NonfairSync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) > at > java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) > at > java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > > Besides one thread that is either doing GroupMetadata.supportsProtocols(): > {noformat} > "kafka-request-handler-6" #86 daemon prio=5 os_prio=0 tid=0x00007f9a32b14000 > nid=0x1b984 runnable [0x00007f98f1bdc000] > java.lang.Thread.State: RUNNABLE > at scala.collection.immutable.List.map(List.scala:284) > at > kafka.coordinator.group.MemberMetadata.protocols(MemberMetadata.scala:68) > at > kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) > at > kafka.coordinator.group.GroupMetadata$$anonfun$candidateProtocols$1.apply(GroupMetadata.scala:265) > at scala.collection.immutable.List.map(List.scala:288) > at > kafka.coordinator.group.GroupMetadata.candidateProtocols(GroupMetadata.scala:265) > at > kafka.coordinator.group.GroupMetadata.supportsProtocols(GroupMetadata.scala:270) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:153) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > or GroupCoordinator.tryCompleteJoin > {noformat} > "kafka-request-handler-8" #88 daemon prio=5 os_prio=0 tid=0x00007fe9f6ad1000 > nid=0x1ceff runnable [0x00007fe8701ca000] > java.lang.Thread.State: RUNNABLE > at > scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248) > at > scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139) > at > scala.collection.mutable.HashMap$$anon$2$$anonfun$foreach$3.apply(HashMap.scala:139) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > at scala.collection.mutable.HashMap$$anon$2.foreach(HashMap.scala:139) > at > scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247) > at > scala.collection.TraversableLike$class.filter(TraversableLike.scala:259) > at scala.collection.AbstractTraversable.filter(Traversable.scala:104) > at > kafka.coordinator.group.GroupMetadata.notYetRejoinedMembers(GroupMetadata.scala:229) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply$mcZ$sp(GroupCoordinator.scala:767) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply(GroupCoordinator.scala:767) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$tryCompleteJoin$1.apply(GroupCoordinator.scala:767) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:189) > at > kafka.coordinator.group.GroupCoordinator.tryCompleteJoin(GroupCoordinator.scala:766) > at > kafka.coordinator.group.DelayedJoin.tryComplete(DelayedJoin.scala:38) > at > kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121) > at > kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:396) > at > kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:298) > at > kafka.coordinator.group.GroupCoordinator$$anonfun$doJoinGroup$1.apply(GroupCoordinator.scala:233) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at > kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:189) > at > kafka.coordinator.group.GroupCoordinator.doJoinGroup(GroupCoordinator.scala:152) > at > kafka.coordinator.group.GroupCoordinator.handleJoinGroup(GroupCoordinator.scala:137) > at kafka.server.KafkaApis.handleJoinGroupRequest(KafkaApis.scala:1241) > at kafka.server.KafkaApis.handle(KafkaApis.scala:115) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745){noformat} > > Both of GroupMetadata.supportsProtocols and GroupCoordinator.tryCompleteJoin > are O(N) operations. This makes the group rebalancing to be an O(N^2) > operation. In spite of how many brokers are there in the cluster and how many > cores are there in the broker, those consumer group operations can only be > processed by a single thread. > My trace log messages show that each GroupMetadata.supportsProtocols() call > on a 3000 member group takes 30ms in average. > Both of the 2 operations can be done in O(1) time, with some data structures > tracing the supported protocols and # of "not yet joined" members when adding > / removing / updating members. -- This message was sent by Atlassian JIRA (v7.6.3#76005)