This is an automated email from the ASF dual-hosted git repository.
sriharsha pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b01f8fb KAFKA-7142: fix joinGroup performance issues (#5354)
b01f8fb is described below
commit b01f8fb668988caa19feb63e878fe1901a9d0c89
Author: ying-zheng <[email protected]>
AuthorDate: Mon Aug 6 13:20:40 2018 -0700
KAFKA-7142: fix joinGroup performance issues (#5354)
Summary:
1. Revert GroupMetadata.members to private
2. Add back a wrongly removed comment
3. In GroupMetadata.remove(), update supportedProtocols and
awaitingJoinCallbackMembers, only when the remove succeeded
Reviewers: Jason Gustafson <[email protected]>, Ismael Juma
<[email protected]>, Sriharsha Chintalapani <[email protected]>
---
.../kafka/coordinator/group/GroupCoordinator.scala | 17 +++----
.../kafka/coordinator/group/GroupMetadata.scala | 53 +++++++++++++++++++---
.../group/GroupMetadataManagerTest.scala | 9 ++--
.../coordinator/group/GroupMetadataTest.scala | 3 +-
4 files changed, 56 insertions(+), 26 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 6ca443f..c4e6dc9 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -600,11 +600,9 @@ class GroupCoordinator(val brokerId: Int,
case Empty | Dead =>
case PreparingRebalance =>
for (member <- group.allMemberMetadata) {
- if (member.awaitingJoinCallback != null) {
- member.awaitingJoinCallback(joinError(member.memberId,
Errors.NOT_COORDINATOR))
- member.awaitingJoinCallback = null
- }
+ group.invokeJoinCallback(member, joinError(member.memberId,
Errors.NOT_COORDINATOR))
}
+
joinPurgatory.checkAndComplete(GroupKey(group.groupId))
case Stable | CompletingRebalance =>
@@ -704,12 +702,11 @@ class GroupCoordinator(val brokerId: Int,
val memberId = clientId + "-" + group.generateMemberIdSuffix
val member = new MemberMetadata(memberId, group.groupId, clientId,
clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, protocols)
- member.awaitingJoinCallback = callback
// update the newMemberAdded flag to indicate that the join group can be
further delayed
if (group.is(PreparingRebalance) && group.generationId == 0)
group.newMemberAdded = true
- group.add(member)
+ group.add(member, callback)
maybePrepareRebalance(group, s"Adding new member $memberId")
member
}
@@ -718,8 +715,7 @@ class GroupCoordinator(val brokerId: Int,
member: MemberMetadata,
protocols: List[(String, Array[Byte])],
callback: JoinCallback) {
- member.supportedProtocols = protocols
- member.awaitingJoinCallback = callback
+ group.updateMember(member, protocols, callback)
maybePrepareRebalance(group, s"Updating metadata for member
${member.memberId}")
}
@@ -765,7 +761,7 @@ class GroupCoordinator(val brokerId: Int,
def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
group.inLock {
- if (group.notYetRejoinedMembers.isEmpty)
+ if (group.hasAllMembersJoined)
forceComplete()
else false
}
@@ -816,8 +812,7 @@ class GroupCoordinator(val brokerId: Int,
leaderId = group.leaderOrNull,
error = Errors.NONE)
- member.awaitingJoinCallback(joinResult)
- member.awaitingJoinCallback = null
+ group.invokeJoinCallback(member, joinResult)
completeAndScheduleNextHeartbeatExpiration(group, member)
}
}
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index d729449..cbe78e9 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -128,7 +128,7 @@ private object GroupMetadata {
group.protocol = Option(protocol)
group.leaderId = Option(leaderId)
group.currentStateTimestamp = currentStateTimestamp
- members.foreach(group.add)
+ members.foreach(group.add(_, null))
group
}
}
@@ -172,6 +172,8 @@ case class
CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offs
*/
@nonthreadsafe
private[group] class GroupMetadata(val groupId: String, initialState:
GroupState, time: Time) extends Logging {
+ type JoinCallback = JoinGroupResult => Unit
+
private[group] val lock = new ReentrantLock
private var state: GroupState = initialState
@@ -182,6 +184,8 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
private var protocol: Option[String] = None
private val members = new mutable.HashMap[String, MemberMetadata]
+ private var numMembersAwaitingJoin = 0
+ private val supportedProtocols = new mutable.HashMap[String,
Integer]().withDefaultValue(0)
private val offsets = new mutable.HashMap[TopicPartition,
CommitRecordMetadataAndOffset]
private val pendingOffsetCommits = new mutable.HashMap[TopicPartition,
OffsetAndMetadata]
private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long,
mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]()
@@ -202,7 +206,7 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
def protocolOrNull: String = protocol.orNull
def currentStateTimestampOrDefault: Long =
currentStateTimestamp.getOrElse(-1)
- def add(member: MemberMetadata) {
+ def add(member: MemberMetadata, callback: JoinCallback = null) {
if (members.isEmpty)
this.protocolType = Some(member.protocolType)
@@ -213,10 +217,19 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
if (leaderId.isEmpty)
leaderId = Some(member.memberId)
members.put(member.memberId, member)
+ member.supportedProtocols.foreach{ case (protocol, _) =>
supportedProtocols(protocol) += 1 }
+ member.awaitingJoinCallback = callback
+ if (member.awaitingJoinCallback != null)
+ numMembersAwaitingJoin += 1;
}
def remove(memberId: String) {
- members.remove(memberId)
+ members.remove(memberId).foreach { member =>
+ member.supportedProtocols.foreach{ case (protocol, _) =>
supportedProtocols(protocol) -= 1 }
+ if (member.awaitingJoinCallback != null)
+ numMembersAwaitingJoin -= 1
+ }
+
if (isLeader(memberId)) {
leaderId = if (members.isEmpty) {
None
@@ -230,6 +243,8 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback ==
null).toList
+ def hasAllMembersJoined = members.size <= numMembersAwaitingJoin
+
def allMembers = members.keySet
def allMemberMetadata = members.values.toList
@@ -268,13 +283,37 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
private def candidateProtocols = {
// get the set of protocols that are commonly supported by all members
- allMemberMetadata
- .map(_.protocols)
- .reduceLeft((commonProtocols, protocols) => commonProtocols & protocols)
+ val numMembers = members.size
+ supportedProtocols.filter(_._2 == numMembers).map(_._1).toSet
}
def supportsProtocols(memberProtocols: Set[String]) = {
- members.isEmpty || (memberProtocols & candidateProtocols).nonEmpty
+ val numMembers = members.size
+ members.isEmpty || memberProtocols.exists(supportedProtocols(_) ==
numMembers)
+ }
+
+ def updateMember(member: MemberMetadata,
+ protocols: List[(String, Array[Byte])],
+ callback: JoinCallback) = {
+ member.supportedProtocols.foreach{ case (protocol, _) =>
supportedProtocols(protocol) -= 1 }
+ protocols.foreach{ case (protocol, _) => supportedProtocols(protocol) += 1
}
+ member.supportedProtocols = protocols
+
+ if (callback != null && member.awaitingJoinCallback == null) {
+ numMembersAwaitingJoin += 1;
+ } else if (callback == null && member.awaitingJoinCallback != null) {
+ numMembersAwaitingJoin -= 1;
+ }
+ member.awaitingJoinCallback = callback
+ }
+
+ def invokeJoinCallback(member: MemberMetadata,
+ joinGroupResult: JoinGroupResult) : Unit = {
+ if (member.awaitingJoinCallback != null) {
+ member.awaitingJoinCallback(joinGroupResult)
+ member.awaitingJoinCallback = null
+ numMembersAwaitingJoin -= 1;
+ }
}
def initNextGeneration() = {
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 77e6fdc..21c1365 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -843,8 +843,7 @@ class GroupMetadataManagerTest {
val member = new MemberMetadata(memberId, groupId, clientId, clientHost,
rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]())))
- member.awaitingJoinCallback = _ => ()
- group.add(member)
+ group.add(member, _ => ())
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
@@ -873,8 +872,7 @@ class GroupMetadataManagerTest {
val member = new MemberMetadata(memberId, groupId, clientId, clientHost,
rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol", Array[Byte]())))
- member.awaitingJoinCallback = _ => ()
- group.add(member)
+ group.add(member, _ => ())
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
@@ -1372,8 +1370,7 @@ class GroupMetadataManagerTest {
val subscription = new Subscription(List(topic).asJava)
val member = new MemberMetadata(memberId, groupId, clientId, clientHost,
rebalanceTimeout, sessionTimeout,
protocolType, List(("protocol",
ConsumerProtocol.serializeSubscription(subscription).array())))
- member.awaitingJoinCallback = _ => ()
- group.add(member)
+ group.add(member, _ => ())
group.transitionTo(PreparingRebalance)
group.initNextGeneration()
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index 9054533..ac12804 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -266,8 +266,7 @@ class GroupMetadataTest extends JUnitSuite {
protocolType, List(("roundrobin", Array.empty[Byte])))
group.transitionTo(PreparingRebalance)
- member.awaitingJoinCallback = _ => ()
- group.add(member)
+ group.add(member, _ => ())
assertEquals(0, group.generationId)
assertNull(group.protocolOrNull)