This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d294b94 KAFKA-12890; Consumer group stuck in `CompletingRebalance`
(#10863)
d294b94 is described below
commit d294b946ca78c9b8caf16547cf2fa6ed348a3033
Author: David Jacot <[email protected]>
AuthorDate: Thu Jun 17 13:46:16 2021 +0200
KAFKA-12890; Consumer group stuck in `CompletingRebalance` (#10863)
This patch introduces a new delayed operation which effectively ensures
that a SyncGroup request is received from all the stable members in the groups
within the rebalance timeout. The timer starts when the group transitions to
the `CompletingRebalance` state. The previous mechanism based on
`DelayedHeartbeat` did not work anymore because of
https://github.com/apache/kafka/pull/8834 which allows heartbeats while the
group is in the `CompletingRebalance`.
Reviewers: Luke Chen <[email protected]>, Jason Gustafson
<[email protected]>
---
.../kafka/coordinator/group/DelayedJoin.scala | 36 ++--
.../kafka/coordinator/group/DelayedRebalance.scala | 34 ++++
.../kafka/coordinator/group/DelayedSync.scala | 48 +++++
.../kafka/coordinator/group/GroupCoordinator.scala | 137 +++++++++++--
.../kafka/coordinator/group/GroupMetadata.scala | 31 +++
.../scala/kafka/server/DelayedOperationKey.scala | 20 +-
.../group/GroupCoordinatorConcurrencyTest.scala | 8 +-
.../coordinator/group/GroupCoordinatorTest.scala | 211 ++++++++++++++++++++-
.../coordinator/group/GroupMetadataTest.scala | 46 +++++
9 files changed, 520 insertions(+), 51 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
index abebfc8..22dfa9d 100644
--- a/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedJoin.scala
@@ -17,7 +17,7 @@
package kafka.coordinator.group
-import kafka.server.{DelayedOperation, DelayedOperationPurgatory, GroupKey}
+import kafka.server.{DelayedOperationPurgatory, GroupJoinKey}
import scala.math.{max, min}
@@ -31,11 +31,16 @@ import scala.math.{max, min}
* the group are marked as failed, and complete this operation to proceed
rebalance with
* the rest of the group.
*/
-private[group] class DelayedJoin(coordinator: GroupCoordinator,
- group: GroupMetadata,
- rebalanceTimeout: Long) extends
DelayedOperation(rebalanceTimeout, Some(group.lock)) {
-
+private[group] class DelayedJoin(
+ coordinator: GroupCoordinator,
+ group: GroupMetadata,
+ rebalanceTimeout: Long
+) extends DelayedRebalance(
+ rebalanceTimeout,
+ group.lock
+) {
override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group,
forceComplete _)
+
override def onExpiration(): Unit = {
// try to complete delayed actions introduced by coordinator.onCompleteJoin
tryToCompleteDelayedAction()
@@ -54,13 +59,18 @@ private[group] class DelayedJoin(coordinator:
GroupCoordinator,
* before the rebalance timeout. If both are true we then schedule a further
delay. Otherwise we complete the
* rebalance.
*/
-private[group] class InitialDelayedJoin(coordinator: GroupCoordinator,
- purgatory:
DelayedOperationPurgatory[DelayedJoin],
- group: GroupMetadata,
- configuredRebalanceDelay: Int,
- delayMs: Int,
- remainingMs: Int) extends
DelayedJoin(coordinator, group, delayMs) {
-
+private[group] class InitialDelayedJoin(
+ coordinator: GroupCoordinator,
+ purgatory: DelayedOperationPurgatory[DelayedRebalance],
+ group: GroupMetadata,
+ configuredRebalanceDelay: Int,
+ delayMs: Int,
+ remainingMs: Int
+) extends DelayedJoin(
+ coordinator,
+ group,
+ delayMs
+) {
override def tryComplete(): Boolean = false
override def onComplete(): Unit = {
@@ -75,7 +85,7 @@ private[group] class InitialDelayedJoin(coordinator:
GroupCoordinator,
configuredRebalanceDelay,
delay,
remaining
- ), Seq(GroupKey(group.groupId)))
+ ), Seq(GroupJoinKey(group.groupId)))
} else
super.onComplete()
}
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedRebalance.scala
b/core/src/main/scala/kafka/coordinator/group/DelayedRebalance.scala
new file mode 100644
index 0000000..bad109a
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedRebalance.scala
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator.group
+
+import kafka.server.DelayedOperation
+
+import java.util.concurrent.locks.Lock
+
+/**
+ * Delayed rebalance operation that is shared by DelayedJoin and DelayedSync
+ * operations. This allows us to use a common purgatory for both cases.
+ */
+private[group] abstract class DelayedRebalance(
+ rebalanceTimeoutMs: Long,
+ groupLock: Lock
+) extends DelayedOperation(
+ rebalanceTimeoutMs,
+ Some(groupLock)
+)
diff --git a/core/src/main/scala/kafka/coordinator/group/DelayedSync.scala
b/core/src/main/scala/kafka/coordinator/group/DelayedSync.scala
new file mode 100644
index 0000000..a39adef
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/group/DelayedSync.scala
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator.group
+
+/**
+ * Delayed rebalance operation that is added to the purgatory when the group
is completing the
+ * rebalance.
+ *
+ * Whenever a SyncGroup is received, checks that we received all the SyncGroup
request from
+ * each member of the group; if yes, complete this operation.
+ *
+ * When the operation has expired, any known members that have not sent a
SyncGroup requests
+ * are removed from the group. If any members is removed, the group is
rebalanced.
+ */
+private[group] class DelayedSync(
+ coordinator: GroupCoordinator,
+ group: GroupMetadata,
+ generationId: Int,
+ rebalanceTimeoutMs: Long
+) extends DelayedRebalance(
+ rebalanceTimeoutMs,
+ group.lock
+) {
+ override def tryComplete(): Boolean = {
+ coordinator.tryCompletePendingSync(group, generationId, forceComplete _)
+ }
+
+ override def onExpiration(): Unit = {
+ coordinator.onExpirePendingSync(group, generationId)
+ }
+
+ override def onComplete(): Unit = { }
+}
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 3fc93de..50b00e3 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -53,7 +53,7 @@ class GroupCoordinator(val brokerId: Int,
val offsetConfig: OffsetConfig,
val groupManager: GroupMetadataManager,
val heartbeatPurgatory:
DelayedOperationPurgatory[DelayedHeartbeat],
- val joinPurgatory:
DelayedOperationPurgatory[DelayedJoin],
+ val rebalancePurgatory:
DelayedOperationPurgatory[DelayedRebalance],
time: Time,
metrics: Metrics) extends Logging {
import GroupCoordinator._
@@ -119,7 +119,7 @@ class GroupCoordinator(val brokerId: Int,
isActive.set(false)
groupManager.shutdown()
heartbeatPurgatory.shutdown()
- joinPurgatory.shutdown()
+ rebalancePurgatory.shutdown()
info("Shutdown complete.")
}
@@ -215,7 +215,7 @@ class GroupCoordinator(val brokerId: Int,
// attempt to complete JoinGroup
if (group.is(PreparingRebalance)) {
- joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+ rebalancePurgatory.checkAndComplete(GroupJoinKey(group.groupId))
}
}
}
@@ -564,6 +564,7 @@ class GroupCoordinator(val brokerId: Int,
case CompletingRebalance =>
group.get(memberId).awaitingSyncCallback = responseCallback
+ removePendingSyncMember(group, memberId)
// if this is the leader, then we can attempt to persist state and
transition to stable
if (group.isLeader(memberId)) {
@@ -598,6 +599,8 @@ class GroupCoordinator(val brokerId: Int,
}
case Stable =>
+ removePendingSyncMember(group, memberId)
+
// if the group is stable, we just return the current assignment
val memberMetadata = group.get(memberId)
responseCallback(SyncGroupResult(group.protocolType,
group.protocolName, memberMetadata.assignment, Errors.NONE))
@@ -617,7 +620,7 @@ class GroupCoordinator(val brokerId: Int,
def removeCurrentMemberFromGroup(group: GroupMetadata, memberId: String):
Unit = {
val member = group.get(memberId)
removeMemberAndUpdateGroup(group, member, s"Removing member $memberId on
LeaveGroup")
- removeHeartbeatForLeavingMember(group, member)
+ removeHeartbeatForLeavingMember(group, member.memberId)
info(s"Member $member has left group $groupId through explicit
`LeaveGroup` request")
}
@@ -1104,7 +1107,7 @@ class GroupCoordinator(val brokerId: Int,
group.maybeInvokeJoinCallback(member,
JoinGroupResult(member.memberId, Errors.NOT_COORDINATOR))
}
- joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+ rebalancePurgatory.checkAndComplete(GroupJoinKey(group.groupId))
case Stable | CompletingRebalance =>
for (member <- group.allMemberMetadata) {
@@ -1112,6 +1115,8 @@ class GroupCoordinator(val brokerId: Int,
heartbeatPurgatory.checkAndComplete(MemberKey(group.groupId,
member.memberId))
}
}
+
+ removeSyncExpiration(group)
}
}
@@ -1208,8 +1213,8 @@ class GroupCoordinator(val brokerId: Int,
heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat,
Seq(pendingMemberKey))
}
- private def removeHeartbeatForLeavingMember(group: GroupMetadata, member:
MemberMetadata): Unit = {
- val memberKey = MemberKey(group.groupId, member.memberId)
+ private def removeHeartbeatForLeavingMember(group: GroupMetadata, memberId:
String): Unit = {
+ val memberKey = MemberKey(group.groupId, memberId)
heartbeatPurgatory.checkAndComplete(memberKey)
}
@@ -1342,9 +1347,12 @@ class GroupCoordinator(val brokerId: Int,
if (group.is(CompletingRebalance))
resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
+ // if a sync expiration is pending, cancel it.
+ removeSyncExpiration(group)
+
val delayedRebalance = if (group.is(Empty))
new InitialDelayedJoin(this,
- joinPurgatory,
+ rebalancePurgatory,
group,
groupConfig.groupInitialRebalanceDelayMs,
groupConfig.groupInitialRebalanceDelayMs,
@@ -1357,8 +1365,8 @@ class GroupCoordinator(val brokerId: Int,
info(s"Preparing to rebalance group ${group.groupId} in state
${group.currentState} with old generation " +
s"${group.generationId}
(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) (reason:
$reason)")
- val groupKey = GroupKey(group.groupId)
- joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
+ val groupKey = GroupJoinKey(group.groupId)
+ rebalancePurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}
private def removeMemberAndUpdateGroup(group: GroupMetadata, member:
MemberMetadata, reason: String): Unit = {
@@ -1371,7 +1379,7 @@ class GroupCoordinator(val brokerId: Int,
group.currentState match {
case Dead | Empty =>
case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
- case PreparingRebalance =>
joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+ case PreparingRebalance =>
rebalancePurgatory.checkAndComplete(GroupJoinKey(group.groupId))
}
}
@@ -1379,7 +1387,7 @@ class GroupCoordinator(val brokerId: Int,
group.remove(memberId)
if (group.is(PreparingRebalance)) {
- joinPurgatory.checkAndComplete(GroupKey(group.groupId))
+ rebalancePurgatory.checkAndComplete(GroupJoinKey(group.groupId))
}
}
@@ -1398,9 +1406,9 @@ class GroupCoordinator(val brokerId: Int,
info(s"Group ${group.groupId} removed dynamic members " +
s"who haven't joined: ${notYetRejoinedDynamicMembers.keySet}")
- notYetRejoinedDynamicMembers.values foreach { failedMember =>
+ notYetRejoinedDynamicMembers.values.foreach { failedMember =>
group.remove(failedMember.memberId)
- removeHeartbeatForLeavingMember(group, failedMember)
+ removeHeartbeatForLeavingMember(group, failedMember.memberId)
}
}
@@ -1411,9 +1419,9 @@ class GroupCoordinator(val brokerId: Int,
// of rebalance preparing stage, and send out another delayed operation
// until session timeout removes all the non-responsive members.
error(s"Group ${group.groupId} could not complete rebalance because no
members rejoined")
- joinPurgatory.tryCompleteElseWatch(
+ rebalancePurgatory.tryCompleteElseWatch(
new DelayedJoin(this, group, group.rebalanceTimeoutMs),
- Seq(GroupKey(group.groupId)))
+ Seq(GroupJoinKey(group.groupId)))
} else {
group.initNextGeneration()
if (group.is(Empty)) {
@@ -1450,7 +1458,95 @@ class GroupCoordinator(val brokerId: Int,
group.maybeInvokeJoinCallback(member, joinResult)
completeAndScheduleNextHeartbeatExpiration(group, member)
member.isNew = false
+
+ group.addPendingSyncMember(member.memberId)
}
+
+ schedulePendingSync(group)
+ }
+ }
+ }
+ }
+
+ private def removePendingSyncMember(
+ group: GroupMetadata,
+ memberId: String
+ ): Unit = {
+ group.removePendingSyncMember(memberId)
+ maybeCompleteSyncExpiration(group)
+ }
+
+ private def removeSyncExpiration(
+ group: GroupMetadata
+ ): Unit = {
+ group.clearPendingSyncMembers()
+ maybeCompleteSyncExpiration(group)
+ }
+
+ private def maybeCompleteSyncExpiration(
+ group: GroupMetadata
+ ): Unit = {
+ val groupKey = GroupSyncKey(group.groupId)
+ rebalancePurgatory.checkAndComplete(groupKey)
+ }
+
+ private def schedulePendingSync(
+ group: GroupMetadata
+ ): Unit = {
+ val delayedSync = new DelayedSync(this, group, group.generationId,
group.rebalanceTimeoutMs)
+ val groupKey = GroupSyncKey(group.groupId)
+ rebalancePurgatory.tryCompleteElseWatch(delayedSync, Seq(groupKey))
+ }
+
+ def tryCompletePendingSync(
+ group: GroupMetadata,
+ generationId: Int,
+ forceComplete: () => Boolean
+ ): Boolean = {
+ group.inLock {
+ if (generationId != group.generationId) {
+ forceComplete()
+ } else {
+ group.currentState match {
+ case Dead | Empty | PreparingRebalance =>
+ forceComplete()
+ case CompletingRebalance | Stable =>
+ if (group.hasReceivedSyncFromAllMembers)
+ forceComplete()
+ else false
+ }
+ }
+ }
+ }
+
+ def onExpirePendingSync(
+ group: GroupMetadata,
+ generationId: Int
+ ): Unit = {
+ group.inLock {
+ if (generationId != group.generationId) {
+ error(s"Received unexpected notification of sync expiration for
${group.groupId} " +
+ s"with an old generation $generationId while the group has
${group.generationId}.")
+ } else {
+ group.currentState match {
+ case Dead | Empty | PreparingRebalance =>
+ error(s"Received unexpected notification of sync expiration after
group ${group.groupId} " +
+ s"already transitioned to the ${group.currentState} state.")
+
+ case CompletingRebalance | Stable =>
+ if (!group.hasReceivedSyncFromAllMembers) {
+ val pendingSyncMembers = group.allPendingSyncMembers
+
+ pendingSyncMembers.foreach { memberId =>
+ group.remove(memberId)
+ removeHeartbeatForLeavingMember(group, memberId)
+ }
+
+ debug(s"Group ${group.groupId} removed members who haven't " +
+ s"sent their sync request: $pendingSyncMembers")
+
+ prepareRebalance(group, s"Removing $pendingSyncMembers on
pending sync request expiration")
+ }
}
}
}
@@ -1532,8 +1628,8 @@ object GroupCoordinator {
time: Time,
metrics: Metrics): GroupCoordinator = {
val heartbeatPurgatory =
DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
- val joinPurgatory = DelayedOperationPurgatory[DelayedJoin]("Rebalance",
config.brokerId)
- GroupCoordinator(config, replicaManager, heartbeatPurgatory,
joinPurgatory, time, metrics)
+ val rebalancePurgatory =
DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId)
+ GroupCoordinator(config, replicaManager, heartbeatPurgatory,
rebalancePurgatory, time, metrics)
}
private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig(
@@ -1552,7 +1648,7 @@ object GroupCoordinator {
def apply(config: KafkaConfig,
replicaManager: ReplicaManager,
heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
- joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
+ rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance],
time: Time,
metrics: Metrics): GroupCoordinator = {
val offsetConfig = this.offsetConfig(config)
@@ -1563,7 +1659,8 @@ object GroupCoordinator {
val groupMetadataManager = new GroupMetadataManager(config.brokerId,
config.interBrokerProtocolVersion,
offsetConfig, replicaManager, time, metrics)
- new GroupCoordinator(config.brokerId, groupConfig, offsetConfig,
groupMetadataManager, heartbeatPurgatory, joinPurgatory, time, metrics)
+ new GroupCoordinator(config.brokerId, groupConfig, offsetConfig,
groupMetadataManager, heartbeatPurgatory,
+ rebalancePurgatory, time, metrics)
}
private def memberLeaveError(memberIdentity: MemberIdentity,
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 53bce0b..5cb8a73 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -214,6 +214,7 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long,
mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]()
private var receivedTransactionalOffsetCommits = false
private var receivedConsumerOffsetCommits = false
+ private val pendingSyncMembers = new mutable.HashSet[String]
// When protocolType == `consumer`, a set of subscribed topics is
maintained. The set is
// computed when a new generation is created or when the group is restored
from the log.
@@ -274,6 +275,7 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
leaderId = members.keys.headOption
pendingMembers.remove(memberId)
+ pendingSyncMembers.remove(memberId)
}
/**
@@ -344,6 +346,34 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
pendingMembers.add(memberId)
}
+ def addPendingSyncMember(memberId: String): Boolean = {
+ if (!has(memberId)) {
+ throw new IllegalStateException(s"Attempt to add a pending sync for
member $memberId which " +
+ "is not a member of the group")
+ }
+ pendingSyncMembers.add(memberId)
+ }
+
+ def removePendingSyncMember(memberId: String): Boolean = {
+ if (!has(memberId)) {
+ throw new IllegalStateException(s"Attempt to remove a pending sync for
member $memberId which " +
+ "is not a member of the group")
+ }
+ pendingSyncMembers.remove(memberId)
+ }
+
+ def hasReceivedSyncFromAllMembers: Boolean = {
+ pendingSyncMembers.isEmpty
+ }
+
+ def allPendingSyncMembers: Set[String] = {
+ pendingSyncMembers.toSet
+ }
+
+ def clearPendingSyncMembers(): Unit = {
+ pendingSyncMembers.clear()
+ }
+
def hasStaticMember(groupInstanceId: String): Boolean = {
staticMembers.contains(groupInstanceId)
}
@@ -546,6 +576,7 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
}
receivedConsumerOffsetCommits = false
receivedTransactionalOffsetCommits = false
+ clearPendingSyncMembers()
}
def currentMemberMetadata: List[JoinGroupResponseMember] = {
diff --git a/core/src/main/scala/kafka/server/DelayedOperationKey.scala
b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
index 3be412b..05a6a99 100644
--- a/core/src/main/scala/kafka/server/DelayedOperationKey.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperationKey.scala
@@ -32,9 +32,7 @@ object DelayedOperationKey {
/* used by delayed-produce and delayed-fetch operations */
case class TopicPartitionOperationKey(topic: String, partition: Int) extends
DelayedOperationKey {
-
-
- override def keyLabel = "%s-%d".format(topic, partition)
+ override def keyLabel: String = "%s-%d".format(topic, partition)
}
object TopicPartitionOperationKey {
@@ -45,18 +43,20 @@ object TopicPartitionOperationKey {
/* used by delayed-join-group operations */
case class MemberKey(groupId: String, consumerId: String) extends
DelayedOperationKey {
-
- override def keyLabel = "%s-%s".format(groupId, consumerId)
+ override def keyLabel: String = "%s-%s".format(groupId, consumerId)
}
-/* used by delayed-rebalance operations */
-case class GroupKey(groupId: String) extends DelayedOperationKey {
+/* used by delayed-join operations */
+case class GroupJoinKey(groupId: String) extends DelayedOperationKey {
+ override def keyLabel: String = "join-%s".format(groupId)
+}
- override def keyLabel = groupId
+/* used by delayed-sync operations */
+case class GroupSyncKey(groupId: String) extends DelayedOperationKey {
+ override def keyLabel: String = "sync-%s".format(groupId)
}
/* used by delayed-topic operations */
case class TopicKey(topic: String) extends DelayedOperationKey {
-
- override def keyLabel = topic
+ override def keyLabel: String = topic
}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index 4facf85..2ef487c 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -62,7 +62,7 @@ class GroupCoordinatorConcurrencyTest extends
AbstractCoordinatorConcurrencyTest
)
var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = _
- var joinPurgatory: DelayedOperationPurgatory[DelayedJoin] = _
+ var rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance] = _
var groupCoordinator: GroupCoordinator = _
@BeforeEach
@@ -81,9 +81,9 @@ class GroupCoordinatorConcurrencyTest extends
AbstractCoordinatorConcurrencyTest
val config = KafkaConfig.fromProps(serverProps)
heartbeatPurgatory = new
DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer,
config.brokerId, reaperEnabled = false)
- joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance",
timer, config.brokerId, reaperEnabled = false)
+ rebalancePurgatory = new
DelayedOperationPurgatory[DelayedRebalance]("Rebalance", timer,
config.brokerId, reaperEnabled = false)
- groupCoordinator = GroupCoordinator(config, replicaManager,
heartbeatPurgatory, joinPurgatory, timer.time, new Metrics())
+ groupCoordinator = GroupCoordinator(config, replicaManager,
heartbeatPurgatory, rebalancePurgatory, timer.time, new Metrics())
groupCoordinator.startup(() =>
zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions),
false)
}
@@ -150,7 +150,7 @@ class GroupCoordinatorConcurrencyTest extends
AbstractCoordinatorConcurrencyTest
if (groupCoordinator != null)
groupCoordinator.shutdown()
groupCoordinator = GroupCoordinator(config, replicaManager,
heartbeatPurgatory,
- joinPurgatory, timer.time, new Metrics())
+ rebalancePurgatory, timer.time, new Metrics())
groupCoordinator.startup(() =>
zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions),
false)
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index cf14a36..0784259 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -113,9 +113,9 @@ class GroupCoordinatorTest {
val config = KafkaConfig.fromProps(props)
val heartbeatPurgatory = new
DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer,
config.brokerId, reaperEnabled = false)
- val joinPurgatory = new
DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId,
reaperEnabled = false)
+ val rebalancePurgatory = new
DelayedOperationPurgatory[DelayedRebalance]("Rebalance", timer,
config.brokerId, reaperEnabled = false)
- groupCoordinator = GroupCoordinator(config, replicaManager,
heartbeatPurgatory, joinPurgatory, timer.time, new Metrics())
+ groupCoordinator = GroupCoordinator(config, replicaManager,
heartbeatPurgatory, rebalancePurgatory, timer.time, new Metrics())
groupCoordinator.startup(() =>
zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicPartitions),
enableMetadataExpiration = false)
@@ -1667,7 +1667,7 @@ class GroupCoordinatorTest {
}
@Test
- def testheartbeatDeadGroup(): Unit = {
+ def testHeartbeatDeadGroup(): Unit = {
val memberId = "memberId"
val deadGroupId = "deadGroupId"
@@ -1678,7 +1678,7 @@ class GroupCoordinatorTest {
}
@Test
- def testheartbeatEmptyGroup(): Unit = {
+ def testHeartbeatEmptyGroup(): Unit = {
val memberId = "memberId"
val group = new GroupMetadata(groupId, Empty, new MockTime())
@@ -2254,6 +2254,209 @@ class GroupCoordinatorTest {
assertEquals(0, group().numPending)
}
+ private def verifyHeartbeat(
+ joinGroupResult: JoinGroupResult,
+ expectedError: Errors
+ ): Unit = {
+ EasyMock.reset(replicaManager)
+ val heartbeatResult = heartbeat(
+ groupId,
+ joinGroupResult.memberId,
+ joinGroupResult.generationId
+ )
+ assertEquals(expectedError, heartbeatResult)
+ }
+
+ private def joinWithNMembers(nbMembers: Int): Seq[JoinGroupResult] = {
+ val requiredKnownMemberId = true
+
+ // First JoinRequests
+ var futures = 1.to(nbMembers).map { _ =>
+ EasyMock.reset(replicaManager)
+ sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType,
protocols,
+ None, DefaultSessionTimeout, DefaultRebalanceTimeout,
requiredKnownMemberId)
+ }
+
+ // Get back the assigned member ids
+ val memberIds = futures.map(await(_, 1).memberId)
+
+ // Second JoinRequests
+ futures = memberIds.map { memberId =>
+ EasyMock.reset(replicaManager)
+ sendJoinGroup(groupId, memberId, protocolType, protocols,
+ None, DefaultSessionTimeout, DefaultRebalanceTimeout,
requiredKnownMemberId)
+ }
+
+ timer.advanceClock(GroupInitialRebalanceDelay + 1)
+ timer.advanceClock(DefaultRebalanceTimeout + 1)
+
+ futures.map(await(_, 1))
+ }
+
+ @Test
+ def testRebalanceTimesOutWhenSyncRequestIsNotReceived(): Unit = {
+ // This test case ensure that the DelayedSync does kick out all members
+ // if they don't sent a sync request before the rebalance timeout. The
+ // group is in the Stable state in this case.
+ val results = joinWithNMembers(nbMembers = 3)
+ assertEquals(Set(Errors.NONE), results.map(_.error).toSet)
+
+ // Advance time
+ timer.advanceClock(DefaultRebalanceTimeout / 2)
+
+ // Heartbeats to ensure that heartbeating does not interfere with the
+ // delayed sync operation.
+ results.foreach { joinGroupResult =>
+ verifyHeartbeat(joinGroupResult, Errors.NONE)
+ }
+
+ // Advance part the rebalance timeout to trigger the delayed operation.
+ EasyMock.reset(replicaManager)
+ EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject()))
+ .andReturn(Some(RecordBatch.MAGIC_VALUE_V1))
+ .anyTimes()
+ EasyMock.replay(replicaManager)
+
+ timer.advanceClock(DefaultRebalanceTimeout / 2 + 1)
+
+ // Heartbeats fail because none of the members have sent the sync request
+ results.foreach { joinGroupResult =>
+ verifyHeartbeat(joinGroupResult, Errors.UNKNOWN_MEMBER_ID)
+ }
+ }
+
+ @Test
+ def testRebalanceTimesOutWhenSyncRequestIsNotReceivedFromFollowers(): Unit =
{
+ // This test case ensure that the DelayedSync does kick out the followers
+ // if they don't sent a sync request before the rebalance timeout. The
+ // group is in the Stable state in this case.
+ val results = joinWithNMembers(nbMembers = 3)
+ assertEquals(Set(Errors.NONE), results.map(_.error).toSet)
+
+ // Advance time
+ timer.advanceClock(DefaultRebalanceTimeout / 2)
+
+ // Heartbeats to ensure that heartbeating does not interfere with the
+ // delayed sync operation.
+ results.foreach { joinGroupResult =>
+ verifyHeartbeat(joinGroupResult, Errors.NONE)
+ }
+
+ // Leader sends Sync
+ EasyMock.reset(replicaManager)
+ val assignments = results.map(result => result.memberId ->
Array.empty[Byte]).toMap
+ val leaderResult = sendSyncGroupLeader(groupId, results.head.generationId,
results.head.memberId,
+ Some(protocolType), Some(protocolName), None, assignments)
+
+ assertEquals(Errors.NONE, await(leaderResult, 1).error)
+
+ // Leader should be able to heartbeart
+ verifyHeartbeat(results.head, Errors.NONE)
+
+ // Advance part the rebalance timeout to trigger the delayed operation.
+ timer.advanceClock(DefaultRebalanceTimeout / 2 + 1)
+
+ // Leader should be able to heartbeart
+ verifyHeartbeat(results.head, Errors.REBALANCE_IN_PROGRESS)
+
+ // Followers should have been removed.
+ results.tail.foreach { joinGroupResult =>
+ verifyHeartbeat(joinGroupResult, Errors.UNKNOWN_MEMBER_ID)
+ }
+ }
+
+ @Test
+ def testRebalanceTimesOutWhenSyncRequestIsNotReceivedFromLeaders(): Unit = {
+ // This test case ensure that the DelayedSync does kick out the leader
+ // if it does not sent a sync request before the rebalance timeout. The
+ // group is in the CompletingRebalance state in this case.
+ val results = joinWithNMembers(nbMembers = 3)
+ assertEquals(Set(Errors.NONE), results.map(_.error).toSet)
+
+ // Advance time
+ timer.advanceClock(DefaultRebalanceTimeout / 2)
+
+ // Heartbeats to ensure that heartbeating does not interfere with the
+ // delayed sync operation.
+ results.foreach { joinGroupResult =>
+ verifyHeartbeat(joinGroupResult, Errors.NONE)
+ }
+
+ // Followers send Sync
+ EasyMock.reset(replicaManager)
+ val followerResults = results.tail.map { joinGroupResult =>
+ EasyMock.reset(replicaManager)
+ sendSyncGroupFollower(groupId, joinGroupResult.generationId,
joinGroupResult.memberId,
+ Some(protocolType), Some(protocolName), None)
+ }
+
+ // Advance part the rebalance timeout to trigger the delayed operation.
+ timer.advanceClock(DefaultRebalanceTimeout / 2 + 1)
+
+ val followerErrors = followerResults.map(await(_, 1).error)
+ assertEquals(Set(Errors.REBALANCE_IN_PROGRESS), followerErrors.toSet)
+
+ // Leader should have been removed.
+ verifyHeartbeat(results.head, Errors.UNKNOWN_MEMBER_ID)
+
+ // Followers should be able to heartbeat.
+ results.tail.foreach { joinGroupResult =>
+ verifyHeartbeat(joinGroupResult, Errors.REBALANCE_IN_PROGRESS)
+ }
+ }
+
+ @Test
+ def testRebalanceDoesNotTimeOutWhenAllSyncAreReceived(): Unit = {
+ // This test case ensure that the DelayedSync does not kick any
+ // members out when they have all sent their sync requests.
+ val results = joinWithNMembers(nbMembers = 3)
+ assertEquals(Set(Errors.NONE), results.map(_.error).toSet)
+
+ // Advance time
+ timer.advanceClock(DefaultRebalanceTimeout / 2)
+
+ // Heartbeats to ensure that heartbeating does not interfere with the
+ // delayed sync operation.
+ results.foreach { joinGroupResult =>
+ verifyHeartbeat(joinGroupResult, Errors.NONE)
+ }
+
+ EasyMock.reset(replicaManager)
+ val assignments = results.map(result => result.memberId ->
Array.empty[Byte]).toMap
+ val leaderResult = sendSyncGroupLeader(groupId, results.head.generationId,
results.head.memberId,
+ Some(protocolType), Some(protocolName), None, assignments)
+
+ assertEquals(Errors.NONE, await(leaderResult, 1).error)
+
+ // Followers send Sync
+ EasyMock.reset(replicaManager)
+ val followerResults = results.tail.map { joinGroupResult =>
+ EasyMock.reset(replicaManager)
+ sendSyncGroupFollower(groupId, joinGroupResult.generationId,
joinGroupResult.memberId,
+ Some(protocolType), Some(protocolName), None)
+ }
+
+ val followerErrors = followerResults.map(await(_, 1).error)
+ assertEquals(Set(Errors.NONE), followerErrors.toSet)
+
+ // Advance past the rebalance timeout to expire the Sync timout. All
+ // members should remain and the group should not rebalance.
+ timer.advanceClock(DefaultRebalanceTimeout / 2 + 1)
+
+ // Followers should be able to heartbeat.
+ results.foreach { joinGroupResult =>
+ verifyHeartbeat(joinGroupResult, Errors.NONE)
+ }
+
+ // Advance a bit more.
+ timer.advanceClock(DefaultRebalanceTimeout / 2)
+
+ // Followers should be able to heartbeat.
+ results.foreach { joinGroupResult =>
+ verifyHeartbeat(joinGroupResult, Errors.NONE)
+ }
+ }
+
private def group(groupId: String = groupId) = {
groupCoordinator.groupManager.getGroup(groupId) match {
case Some(g) => g
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index ce6128a..275b7f6 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -663,6 +663,52 @@ class GroupMetadataTest {
assertThrows(classOf[IllegalStateException], () => group.add(member))
}
+ @Test
+ def testCannotAddPendingSyncOfUnknownMember(): Unit = {
+ assertThrows(classOf[IllegalStateException],
+ () => group.addPendingSyncMember(memberId))
+ }
+
+ @Test
+ def testCannotRemovePendingSyncOfUnknownMember(): Unit = {
+ assertThrows(classOf[IllegalStateException],
+ () => group.removePendingSyncMember(memberId))
+ }
+
+ @Test
+ def testCanAddAndRemovePendingSyncMember(): Unit = {
+ val member = new MemberMetadata(memberId, Some(groupInstanceId), clientId,
clientHost,
+ rebalanceTimeoutMs, sessionTimeoutMs, protocolType, List(("range",
Array.empty[Byte])))
+ group.add(member)
+ group.addPendingSyncMember(memberId)
+ assertEquals(Set(memberId), group.allPendingSyncMembers)
+ group.removePendingSyncMember(memberId)
+ assertEquals(Set(), group.allPendingSyncMembers)
+ }
+
+ @Test
+ def testRemovalFromPendingSyncWhenMemberIsRemoved(): Unit = {
+ val member = new MemberMetadata(memberId, Some(groupInstanceId), clientId,
clientHost,
+ rebalanceTimeoutMs, sessionTimeoutMs, protocolType, List(("range",
Array.empty[Byte])))
+ group.add(member)
+ group.addPendingSyncMember(memberId)
+ assertEquals(Set(memberId), group.allPendingSyncMembers)
+ group.remove(memberId)
+ assertEquals(Set(), group.allPendingSyncMembers)
+ }
+
+ @Test
+ def testNewGenerationClearsPendingSyncMembers(): Unit = {
+ val member = new MemberMetadata(memberId, Some(groupInstanceId), clientId,
clientHost,
+ rebalanceTimeoutMs, sessionTimeoutMs, protocolType, List(("range",
Array.empty[Byte])))
+ group.add(member)
+ group.transitionTo(PreparingRebalance)
+ group.addPendingSyncMember(memberId)
+ assertEquals(Set(memberId), group.allPendingSyncMembers)
+ group.initNextGeneration()
+ assertEquals(Set(), group.allPendingSyncMembers)
+ }
+
private def assertState(group: GroupMetadata, targetState: GroupState): Unit
= {
val states: Set[GroupState] = Set(Stable, PreparingRebalance,
CompletingRebalance, Dead)
val otherStates = states - targetState