Repository: kafka Updated Branches: refs/heads/trunk a1ea53606 -> 6d6080f13
KAFKA-5565: Add a broker metric specifying the number of consumer group rebalances in progress â¦up rebalances in progress Author: Colin P. Mccabe <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jason Gustafson <[email protected]>, Guozhang Wang <[email protected]> Closes #3506 from cmccabe/KAFKA-5565 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6d6080f1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6d6080f1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6d6080f1 Branch: refs/heads/trunk Commit: 6d6080f13633508c57e48cbc12788ce643af4953 Parents: a1ea536 Author: Colin P. Mccabe <[email protected]> Authored: Mon Oct 9 09:28:42 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Mon Oct 9 09:28:42 2017 -0700 ---------------------------------------------------------------------- .../common/requests/DescribeGroupsResponse.java | 2 +- .../common/requests/JoinGroupResponse.java | 14 ++++++ .../kafka/admin/ConsumerGroupCommand.scala | 2 +- .../coordinator/group/GroupCoordinator.scala | 22 ++++----- .../kafka/coordinator/group/GroupMetadata.scala | 14 +++--- .../group/GroupMetadataManager.scala | 50 +++++++++++++++++--- .../scala/kafka/metrics/KafkaMetricsGroup.scala | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 2 +- .../group/GroupCoordinatorTest.scala | 4 +- .../group/GroupMetadataManagerTest.scala | 29 +++++++++++- .../coordinator/group/GroupMetadataTest.scala | 30 ++++++------ 11 files changed, 125 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java index 61c5a36..313e113 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java @@ -64,7 +64,7 @@ public class DescribeGroupsResponse extends AbstractResponse { private static final Schema DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0 = new Schema( ERROR_CODE, new Field(GROUP_ID_KEY_NAME, STRING), - new Field(GROUP_STATE_KEY_NAME, STRING, "The current state of the group (one of: Dead, Stable, AwaitingSync, " + + new Field(GROUP_STATE_KEY_NAME, STRING, "The current state of the group (one of: Dead, Stable, CompletingRebalance, " + "PreparingRebalance, or empty if there is no active group)"), new Field(PROTOCOL_TYPE_KEY_NAME, STRING, "The current group protocol type (will be empty if there is no active group)"), new Field(PROTOCOL_KEY_NAME, STRING, "The current group protocol (only provided if the group is Stable)"), http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java index 56491eb..d9f987b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.protocol.types.ArrayOf; import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -202,4 +203,17 @@ public class JoinGroupResponse extends AbstractResponse { return struct; } + + @Override + public String toString() { + return "JoinGroupResponse" + + "(throttleTimeMs=" + throttleTimeMs + + ", error=" + error + + ", generationId=" + generationId + + ", groupProtocol=" + groupProtocol + + ", memberId=" + memberId + + ", leaderId=" + leaderId + + ", members=" + ((members == null) ? "null" : + Utils.join(members.keySet(), ",")) + ")"; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 2120657..d71f062 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -89,7 +89,7 @@ object ConsumerGroupCommand extends Logging { case Some("Empty") => System.err.println(s"Consumer group '$groupId' has no active members.") printAssignment(assignments, true) - case Some("PreparingRebalance") | Some("AwaitingSync") => + case Some("PreparingRebalance") | Some("CompletingRebalance") => System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.") printAssignment(assignments, true) case Some("Stable") => http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index bb59bcd..dd4d52d 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -169,7 +169,7 @@ class GroupCoordinator(val brokerId: Int, updateMemberAndRebalance(group, member, protocols, responseCallback) } - case AwaitingSync => + case CompletingRebalance => if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback) } else { @@ -261,7 +261,7 @@ class GroupCoordinator(val brokerId: Int, case PreparingRebalance => responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS) - case AwaitingSync => + case CompletingRebalance => group.get(memberId).awaitingSyncCallback = responseCallback // if this is the leader, then we can attempt to persist state and transition to stable @@ -275,9 +275,9 @@ class GroupCoordinator(val brokerId: Int, groupManager.storeGroup(group, assignment, (error: Errors) => { group synchronized { // another member may have joined the group while we were awaiting this callback, - // so we must ensure we are still in the AwaitingSync state and the same generation + // so we must ensure we are still in the CompletingRebalance state and the same generation // when it gets invoked. if we have transitioned to another state, then do nothing - if (group.is(AwaitingSync) && generationId == group.generationId) { + if (group.is(CompletingRebalance) && generationId == group.generationId) { if (error != Errors.NONE) { resetAndPropagateAssignmentError(group, error) maybePrepareRebalance(group) @@ -361,7 +361,7 @@ class GroupCoordinator(val brokerId: Int, case Empty => responseCallback(Errors.UNKNOWN_MEMBER_ID) - case AwaitingSync => + case CompletingRebalance => if (!group.has(memberId)) responseCallback(Errors.UNKNOWN_MEMBER_ID) else @@ -456,7 +456,7 @@ class GroupCoordinator(val brokerId: Int, // the group is only using Kafka to store offsets // Also, for transactional offset commits we don't need to validate group membership and the generation. groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch) - } else if (group.is(AwaitingSync)) { + } else if (group.is(CompletingRebalance)) { responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS)) } else if (!group.has(memberId)) { responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID)) @@ -545,7 +545,7 @@ class GroupCoordinator(val brokerId: Int, } joinPurgatory.checkAndComplete(GroupKey(group.groupId)) - case Stable | AwaitingSync => + case Stable | CompletingRebalance => for (member <- group.allMemberMetadata) { if (member.awaitingSyncCallback != null) { member.awaitingSyncCallback(Array.empty[Byte], Errors.NOT_COORDINATOR) @@ -574,13 +574,13 @@ class GroupCoordinator(val brokerId: Int, } private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) { - assert(group.is(AwaitingSync)) + assert(group.is(CompletingRebalance)) group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId)) propagateAssignment(group, Errors.NONE) } private def resetAndPropagateAssignmentError(group: GroupMetadata, error: Errors) { - assert(group.is(AwaitingSync)) + assert(group.is(CompletingRebalance)) group.allMemberMetadata.foreach(_.assignment = Array.empty[Byte]) propagateAssignment(group, error) } @@ -674,7 +674,7 @@ class GroupCoordinator(val brokerId: Int, private def prepareRebalance(group: GroupMetadata) { // if any members are awaiting sync, cancel their request and have them rejoin - if (group.is(AwaitingSync)) + if (group.is(CompletingRebalance)) resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS) val delayedRebalance = if (group.is(Empty)) @@ -700,7 +700,7 @@ class GroupCoordinator(val brokerId: Int, group.remove(member.memberId) group.currentState match { case Dead | Empty => - case Stable | AwaitingSync => maybePrepareRebalance(group) + case Stable | CompletingRebalance => maybePrepareRebalance(group) case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 18096bb..c4e071d 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -35,7 +35,7 @@ private[group] sealed trait GroupState * park join group requests from new or existing members until all expected members have joined * allow offset commits from previous generation * allow offset fetch requests - * transition: some members have joined by the timeout => AwaitingSync + * transition: some members have joined by the timeout => CompletingRebalance * all members have left the group => Empty * group is removed by partition emigration => Dead */ @@ -54,7 +54,7 @@ private[group] case object PreparingRebalance extends GroupState * member failure detected => PreparingRebalance * group is removed by partition emigration => Dead */ -private[group] case object AwaitingSync extends GroupState +private[group] case object CompletingRebalance extends GroupState /** * Group is stable @@ -105,10 +105,10 @@ private[group] case object Empty extends GroupState private object GroupMetadata { private val validPreviousStates: Map[GroupState, Set[GroupState]] = - Map(Dead -> Set(Stable, PreparingRebalance, AwaitingSync, Empty, Dead), - AwaitingSync -> Set(PreparingRebalance), - Stable -> Set(AwaitingSync), - PreparingRebalance -> Set(Stable, AwaitingSync, Empty), + Map(Dead -> Set(Stable, PreparingRebalance, CompletingRebalance, Empty, Dead), + CompletingRebalance -> Set(PreparingRebalance), + Stable -> Set(CompletingRebalance), + PreparingRebalance -> Set(Stable, CompletingRebalance, Empty), Empty -> Set(PreparingRebalance)) } @@ -256,7 +256,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState if (members.nonEmpty) { generationId += 1 protocol = selectProtocol - transitionTo(AwaitingSync) + transitionTo(CompletingRebalance) } else { generationId += 1 protocol = null http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 7519dc4..8ef4894 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -82,19 +82,57 @@ class GroupMetadataManager(brokerId: Int, this.logIdent = s"[GroupMetadataManager brokerId=$brokerId] " - newGauge("NumOffsets", + private def recreateGauge[T](name: String, gauge: Gauge[T]): Gauge[T] = { + removeMetric(name) + newGauge(name, gauge) + } + + recreateGauge("NumOffsets", new Gauge[Int] { def value = groupMetadataCache.values.map(group => { group synchronized { group.numOffsets } }).sum - } - ) + }) - newGauge("NumGroups", + recreateGauge("NumGroups", new Gauge[Int] { def value = groupMetadataCache.size - } - ) + }) + + recreateGauge("NumGroupsPreparingRebalance", + new Gauge[Int] { + def value(): Int = groupMetadataCache.values.count(group => { + group synchronized { group.is(PreparingRebalance) } + }) + }) + + recreateGauge("NumGroupsCompletingRebalance", + new Gauge[Int] { + def value(): Int = groupMetadataCache.values.count(group => { + group synchronized { group.is(CompletingRebalance) } + }) + }) + + recreateGauge("NumGroupsStable", + new Gauge[Int] { + def value(): Int = groupMetadataCache.values.count(group => { + group synchronized { group.is(Stable) } + }) + }) + + recreateGauge("NumGroupsDead", + new Gauge[Int] { + def value(): Int = groupMetadataCache.values.count(group => { + group synchronized { group.is(Dead) } + }) + }) + + recreateGauge("NumGroupsEmpty", + new Gauge[Int] { + def value(): Int = groupMetadataCache.values.count(group => { + group synchronized { group.is(Empty) } + }) + }) def enableMetadataExpiration() { scheduler.startup() http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala index b9ab486..46db8bc 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala @@ -40,7 +40,7 @@ trait KafkaMetricsGroup extends Logging { * @param tags Additional attributes which mBean will have. * @return Sanitized metric name object. */ - protected def metricName(name: String, tags: scala.collection.Map[String, String]): MetricName = { + def metricName(name: String, tags: scala.collection.Map[String, String]): MetricName = { val klass = this.getClass val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName val simpleName = klass.getSimpleName.replaceAll("\\$$", "") http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 755f500..0b5d43e 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -234,7 +234,7 @@ class ZooKeeperClientMetrics(zkClient: ZkClient, val time: Time) extends ZooKeeperClientWrapper(zkClient) with KafkaMetricsGroup { val latencyMetric = newHistogram("ZooKeeperRequestLatencyMs") - override protected def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = { + override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = { explicitMetricName("kafka.server", "ZooKeeperClientMetrics", name, metricTags) } http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 3fed45d..c9f2ec6 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -1049,7 +1049,7 @@ class GroupCoordinatorTest extends JUnitSuite { } @Test - def testCommitOffsetInAwaitingSync() { + def testCommitOffsetInCompletingRebalance() { val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID val tp = new TopicPartition("topic", 0) val offset = OffsetAndMetadata(0) @@ -1232,7 +1232,7 @@ class GroupCoordinatorTest extends JUnitSuite { assertEquals(Errors.NONE, error) assertEquals(protocolType, summary.protocolType) assertEquals(GroupCoordinator.NoProtocol, summary.protocol) - assertEquals(AwaitingSync.toString, summary.state) + assertEquals(CompletingRebalance.toString, summary.state) assertTrue(summary.members.map(_.memberId).contains(joinGroupResult.memberId)) assertTrue(summary.members.forall(_.metadata.isEmpty)) assertTrue(summary.members.forall(_.assignment.isEmpty)) http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 46a1878..8e5b593 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -23,7 +23,7 @@ import kafka.common.OffsetAndMetadata import kafka.log.{Log, LogAppendInfo} import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager} import kafka.utils.TestUtils.fail -import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils} +import kafka.utils.{KafkaScheduler, Logging, MockTime, TestUtils, ZkUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record._ @@ -34,6 +34,8 @@ import org.junit.Assert.{assertEquals, assertFalse, assertTrue} import org.junit.{Before, Test} import java.nio.ByteBuffer +import com.yammer.metrics.Metrics +import com.yammer.metrics.core.Gauge import org.apache.kafka.common.internals.Topic import scala.collection.JavaConverters._ @@ -1392,4 +1394,29 @@ class GroupMetadataManagerTest { EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition)) } + private def getGauge(manager: GroupMetadataManager, name: String): Gauge[Int] = { + Metrics.defaultRegistry().allMetrics().get(manager.metricName(name, Map.empty)).asInstanceOf[Gauge[Int]] + } + + private def expectMetrics(manager: GroupMetadataManager, + expectedNumGroups: Int, + expectedNumGroupsPreparingRebalance: Int, + expectedNumGroupsCompletingRebalance: Int): Unit = { + assertEquals(expectedNumGroups, getGauge(manager, "NumGroups").value) + assertEquals(expectedNumGroupsPreparingRebalance, getGauge(manager, "NumGroupsPreparingRebalance").value) + assertEquals(expectedNumGroupsCompletingRebalance, getGauge(manager, "NumGroupsCompletingRebalance").value) + } + + @Test + def testMetrics() { + groupMetadataManager.cleanupGroupMetadata() + expectMetrics(groupMetadataManager, 0, 0, 0) + val group = new GroupMetadata("foo2", Stable) + groupMetadataManager.addGroup(group) + expectMetrics(groupMetadataManager, 1, 0, 0) + group.transitionTo(PreparingRebalance) + expectMetrics(groupMetadataManager, 1, 1, 0) + group.transitionTo(CompletingRebalance) + expectMetrics(groupMetadataManager, 1, 0, 1) + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/6d6080f1/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala index 2db6603..ca62bf8 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala @@ -49,9 +49,9 @@ class GroupMetadataTest extends JUnitSuite { } @Test - def testCanRebalanceWhenAwaitingSync() { + def testCanRebalanceWhenCompletingRebalance() { group.transitionTo(PreparingRebalance) - group.transitionTo(AwaitingSync) + group.transitionTo(CompletingRebalance) assertTrue(group.canRebalance) } @@ -82,9 +82,9 @@ class GroupMetadataTest extends JUnitSuite { } @Test - def testAwaitingSyncToPreparingRebalanceTransition() { + def testAwaitingRebalanceToPreparingRebalanceTransition() { group.transitionTo(PreparingRebalance) - group.transitionTo(AwaitingSync) + group.transitionTo(CompletingRebalance) group.transitionTo(PreparingRebalance) assertState(group, PreparingRebalance) } @@ -112,9 +112,9 @@ class GroupMetadataTest extends JUnitSuite { } @Test - def testAwaitingSyncToStableTransition() { + def testAwaitingRebalanceToStableTransition() { group.transitionTo(PreparingRebalance) - group.transitionTo(AwaitingSync) + group.transitionTo(CompletingRebalance) group.transitionTo(Stable) assertState(group, Stable) } @@ -127,7 +127,7 @@ class GroupMetadataTest extends JUnitSuite { @Test def testStableToStableIllegalTransition() { group.transitionTo(PreparingRebalance) - group.transitionTo(AwaitingSync) + group.transitionTo(CompletingRebalance) group.transitionTo(Stable) try { group.transitionTo(Stable) @@ -138,8 +138,8 @@ class GroupMetadataTest extends JUnitSuite { } @Test(expected = classOf[IllegalStateException]) - def testEmptyToAwaitingSyncIllegalTransition() { - group.transitionTo(AwaitingSync) + def testEmptyToAwaitingRebalanceIllegalTransition() { + group.transitionTo(CompletingRebalance) } @Test(expected = classOf[IllegalStateException]) @@ -155,10 +155,10 @@ class GroupMetadataTest extends JUnitSuite { } @Test(expected = classOf[IllegalStateException]) - def testAwaitingSyncToAwaitingSyncIllegalTransition() { + def testAwaitingRebalanceToAwaitingRebalanceIllegalTransition() { group.transitionTo(PreparingRebalance) - group.transitionTo(AwaitingSync) - group.transitionTo(AwaitingSync) + group.transitionTo(CompletingRebalance) + group.transitionTo(CompletingRebalance) } def testDeadToDeadIllegalTransition() { @@ -183,10 +183,10 @@ class GroupMetadataTest extends JUnitSuite { } @Test(expected = classOf[IllegalStateException]) - def testDeadToAwaitingSyncIllegalTransition() { + def testDeadToAwaitingRebalanceIllegalTransition() { group.transitionTo(PreparingRebalance) group.transitionTo(Dead) - group.transitionTo(AwaitingSync) + group.transitionTo(CompletingRebalance) } @Test @@ -466,7 +466,7 @@ class GroupMetadataTest extends JUnitSuite { } private def assertState(group: GroupMetadata, targetState: GroupState) { - val states: Set[GroupState] = Set(Stable, PreparingRebalance, AwaitingSync, Dead) + val states: Set[GroupState] = Set(Stable, PreparingRebalance, CompletingRebalance, Dead) val otherStates = states - targetState otherStates.foreach { otherState => assertFalse(group.is(otherState))
