Repository: kafka Updated Branches: refs/heads/trunk fb42558e2 -> 8c551675a
http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala new file mode 100644 index 0000000..0bd6d71 --- /dev/null +++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala @@ -0,0 +1,407 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import kafka.cluster.Partition +import kafka.common.OffsetAndMetadata +import kafka.log.LogAppendInfo +import kafka.message.{ByteBufferMessageSet, Message, MessageSet} +import kafka.server.{KafkaConfig, ReplicaManager} +import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.internals.TopicConstants +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.Record +import org.apache.kafka.common.requests.OffsetFetchResponse +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse +import org.easymock.{Capture, EasyMock, IAnswer} +import org.junit.{After, Before, Test} +import org.junit.Assert._ + +import scala.collection._ + +class GroupMetadataManagerTest { + + var time: MockTime = null + var replicaManager: ReplicaManager = null + var groupMetadataManager: GroupMetadataManager = null + var scheduler: KafkaScheduler = null + var zkUtils: ZkUtils = null + var partition: Partition = null + + val groupId = "foo" + val groupPartitionId = 0 + val protocolType = "protocolType" + val sessionTimeout = 30000 + + + @Before + def setUp() { + val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")) + + val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize, + loadBufferSize = config.offsetsLoadBufferSize, + offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L, + offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs, + offsetsTopicNumPartitions = config.offsetsTopicPartitions, + offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes, + offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, + offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec, + offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, + offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) + + // make two partitions of the group topic to make sure some partitions are not owned by the coordinator + val ret = mutable.Map[String, Map[Int, Seq[Int]]]() + ret += (TopicConstants.GROUP_METADATA_TOPIC_NAME -> Map(0 -> Seq(1), 1 -> Seq(1))) + + zkUtils = EasyMock.createNiceMock(classOf[ZkUtils]) + EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(TopicConstants.GROUP_METADATA_TOPIC_NAME))).andReturn(ret) + EasyMock.replay(zkUtils) + + time = new MockTime + replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager]) + groupMetadataManager = new GroupMetadataManager(0, offsetConfig, replicaManager, zkUtils, time) + partition = EasyMock.niceMock(classOf[Partition]) + + } + + @After + def tearDown() { + EasyMock.reset(replicaManager) + EasyMock.reset(partition) + } + + @Test + def testAddGroup() { + val group = new GroupMetadata("foo") + assertEquals(group, groupMetadataManager.addGroup(group)) + assertEquals(group, groupMetadataManager.addGroup(new GroupMetadata("foo"))) + } + + @Test + def testStoreEmptyGroup() { + val group = new GroupMetadata(groupId) + groupMetadataManager.addGroup(group) + + expectAppendMessage(Errors.NONE) + EasyMock.replay(replicaManager) + + var errorCode: Option[Short] = None + def callback(error: Short) { + errorCode = Some(error) + } + + val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map.empty, callback) + groupMetadataManager.store(delayedStore) + assertEquals(Errors.NONE.code, errorCode.get) + } + + @Test + def testStoreNonEmptyGroup() { + val memberId = "memberId" + val clientId = "clientId" + val clientHost = "localhost" + + val group = new GroupMetadata(groupId) + groupMetadataManager.addGroup(group) + + val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeout, + protocolType, List(("protocol", Array[Byte]()))) + member.awaitingJoinCallback = (joinGroupResult: JoinGroupResult) => {} + group.add(memberId, member) + group.transitionTo(PreparingRebalance) + group.initNextGeneration() + + expectAppendMessage(Errors.NONE) + EasyMock.replay(replicaManager) + + var errorCode: Option[Short] = None + def callback(error: Short) { + errorCode = Some(error) + } + + val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map(memberId -> Array[Byte]()), callback) + groupMetadataManager.store(delayedStore) + assertEquals(Errors.NONE.code, errorCode.get) + } + + @Test + def testCommitOffset() { + val memberId = "" + val generationId = -1 + val topicPartition = new TopicPartition("foo", 0) + val offset = 37 + + groupMetadataManager.addPartitionOwnership(groupPartitionId) + + val group = new GroupMetadata(groupId) + groupMetadataManager.addGroup(group) + + val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset)) + + expectAppendMessage(Errors.NONE) + EasyMock.replay(replicaManager) + + var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None + def callback(errors: immutable.Map[TopicPartition, Short]) { + commitErrors = Some(errors) + } + + val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback) + assertTrue(group.hasOffsets) + + groupMetadataManager.store(delayedStore) + + assertFalse(commitErrors.isEmpty) + val maybeError = commitErrors.get.get(topicPartition) + assertEquals(Some(Errors.NONE.code), maybeError) + assertTrue(group.hasOffsets) + + val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition)) + val maybePartitionResponse = cachedOffsets.get(topicPartition) + assertFalse(maybePartitionResponse.isEmpty) + + val partitionResponse = maybePartitionResponse.get + assertEquals(Errors.NONE.code, partitionResponse.errorCode) + assertEquals(offset, partitionResponse.offset) + } + + @Test + def testCommitOffsetFailure() { + val memberId = "" + val generationId = -1 + val topicPartition = new TopicPartition("foo", 0) + val offset = 37 + + groupMetadataManager.addPartitionOwnership(groupPartitionId) + + val group = new GroupMetadata(groupId) + groupMetadataManager.addGroup(group) + + val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset)) + + expectAppendMessage(Errors.NOT_LEADER_FOR_PARTITION) + EasyMock.replay(replicaManager) + + var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None + def callback(errors: immutable.Map[TopicPartition, Short]) { + commitErrors = Some(errors) + } + + val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback) + assertTrue(group.hasOffsets) + + groupMetadataManager.store(delayedStore) + + assertFalse(commitErrors.isEmpty) + val maybeError = commitErrors.get.get(topicPartition) + assertEquals(Some(Errors.NOT_COORDINATOR_FOR_GROUP.code), maybeError) + assertFalse(group.hasOffsets) + + val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition)) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition).map(_.offset)) + } + + @Test + def testExpireOffset() { + val memberId = "" + val generationId = -1 + val topicPartition1 = new TopicPartition("foo", 0) + val topicPartition2 = new TopicPartition("foo", 1) + val offset = 37 + + groupMetadataManager.addPartitionOwnership(groupPartitionId) + + val group = new GroupMetadata(groupId) + groupMetadataManager.addGroup(group) + + // expire the offset after 1 millisecond + val startMs = time.milliseconds + val offsets = immutable.Map( + topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1), + topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3)) + + EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)).andStubReturn(Some(partition)) + expectAppendMessage(Errors.NONE) + EasyMock.replay(replicaManager) + + var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None + def callback(errors: immutable.Map[TopicPartition, Short]) { + commitErrors = Some(errors) + } + + val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback) + assertTrue(group.hasOffsets) + + groupMetadataManager.store(delayedStore) + assertFalse(commitErrors.isEmpty) + assertEquals(Some(Errors.NONE.code), commitErrors.get.get(topicPartition1)) + + // expire only one of the offsets + time.sleep(2) + + EasyMock.reset(partition) + EasyMock.expect(partition.appendMessagesToLeader(EasyMock.anyObject(classOf[ByteBufferMessageSet]), EasyMock.anyInt())) + .andReturn(LogAppendInfo.UnknownLogAppendInfo) + EasyMock.replay(partition) + + groupMetadataManager.cleanupGroupMetadata() + + assertEquals(Some(group), groupMetadataManager.getGroup(groupId)) + assertEquals(None, group.offset(topicPartition1)) + assertEquals(Some(offset), group.offset(topicPartition2).map(_.offset)) + + val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition1, topicPartition2)) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset)) + assertEquals(Some(offset), cachedOffsets.get(topicPartition2).map(_.offset)) + } + + @Test + def testExpireGroup() { + val memberId = "" + val generationId = -1 + val topicPartition1 = new TopicPartition("foo", 0) + val topicPartition2 = new TopicPartition("foo", 1) + val offset = 37 + + groupMetadataManager.addPartitionOwnership(groupPartitionId) + + val group = new GroupMetadata(groupId) + groupMetadataManager.addGroup(group) + + // expire the offset after 1 millisecond + val startMs = time.milliseconds + val offsets = immutable.Map( + topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1), + topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3)) + + EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)).andStubReturn(Some(partition)) + expectAppendMessage(Errors.NONE) + EasyMock.replay(replicaManager) + + var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None + def callback(errors: immutable.Map[TopicPartition, Short]) { + commitErrors = Some(errors) + } + + val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback) + assertTrue(group.hasOffsets) + + groupMetadataManager.store(delayedStore) + assertFalse(commitErrors.isEmpty) + assertEquals(Some(Errors.NONE.code), commitErrors.get.get(topicPartition1)) + + // expire all of the offsets + time.sleep(4) + + // expect the offset tombstone + EasyMock.reset(partition) + EasyMock.expect(partition.appendMessagesToLeader(EasyMock.anyObject(classOf[ByteBufferMessageSet]), EasyMock.anyInt())) + .andReturn(LogAppendInfo.UnknownLogAppendInfo) + EasyMock.replay(partition) + + groupMetadataManager.cleanupGroupMetadata() + + // the full group should be gone since all offsets were removed + assertEquals(None, groupMetadataManager.getGroup(groupId)) + val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition1, topicPartition2)) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset)) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset)) + } + + @Test + def testExpireOffsetsWithActiveGroup() { + val memberId = "memberId" + val clientId = "clientId" + val clientHost = "localhost" + val topicPartition1 = new TopicPartition("foo", 0) + val topicPartition2 = new TopicPartition("foo", 1) + val offset = 37 + + groupMetadataManager.addPartitionOwnership(groupPartitionId) + + val group = new GroupMetadata(groupId) + groupMetadataManager.addGroup(group) + + val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeout, + protocolType, List(("protocol", Array[Byte]()))) + member.awaitingJoinCallback = (joinGroupResult: JoinGroupResult) => {} + group.add(memberId, member) + group.transitionTo(PreparingRebalance) + group.initNextGeneration() + + // expire the offset after 1 millisecond + val startMs = time.milliseconds + val offsets = immutable.Map( + topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1), + topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3)) + + EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)).andStubReturn(Some(partition)) + expectAppendMessage(Errors.NONE) + EasyMock.replay(replicaManager) + + var commitErrors: Option[immutable.Map[TopicPartition, Short]] = None + def callback(errors: immutable.Map[TopicPartition, Short]) { + commitErrors = Some(errors) + } + + val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, group.generationId, offsets, callback) + assertTrue(group.hasOffsets) + + groupMetadataManager.store(delayedStore) + assertFalse(commitErrors.isEmpty) + assertEquals(Some(Errors.NONE.code), commitErrors.get.get(topicPartition1)) + + // expire all of the offsets + time.sleep(4) + + // expect the offset tombstone + EasyMock.reset(partition) + EasyMock.expect(partition.appendMessagesToLeader(EasyMock.anyObject(classOf[ByteBufferMessageSet]), EasyMock.anyInt())) + .andReturn(LogAppendInfo.UnknownLogAppendInfo) + EasyMock.replay(partition) + + groupMetadataManager.cleanupGroupMetadata() + + // group should still be there, but the offsets should be gone + assertEquals(Some(group), groupMetadataManager.getGroup(groupId)) + assertEquals(None, group.offset(topicPartition1)) + assertEquals(None, group.offset(topicPartition2)) + + val cachedOffsets = groupMetadataManager.getOffsets(groupId, Seq(topicPartition1, topicPartition2)) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset)) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset)) + } + + private def expectAppendMessage(error: Errors) { + val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() + EasyMock.expect(replicaManager.appendMessages(EasyMock.anyLong(), + EasyMock.anyShort(), + EasyMock.anyBoolean(), + EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]], + EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] { + override def answer = capturedArgument.getValue.apply( + Map(new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> + new PartitionResponse(error.code, 0L, Record.NO_TIMESTAMP) + ) + )}) + EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes() + } + + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala index 2846622..18dd143 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala @@ -17,6 +17,8 @@ package kafka.coordinator +import kafka.common.OffsetAndMetadata +import org.apache.kafka.common.TopicPartition import org.junit.Assert._ import org.junit.{Before, Test} import org.scalatest.junit.JUnitSuite @@ -29,7 +31,7 @@ class GroupMetadataTest extends JUnitSuite { @Before def setUp() { - group = new GroupMetadata("groupId", "consumer") + group = new GroupMetadata("groupId") } @Test @@ -53,6 +55,7 @@ class GroupMetadataTest extends JUnitSuite { @Test def testCannotRebalanceWhenDead() { group.transitionTo(PreparingRebalance) + group.transitionTo(Empty) group.transitionTo(Dead) assertFalse(group.canRebalance) } @@ -64,6 +67,12 @@ class GroupMetadataTest extends JUnitSuite { } @Test + def testStableToDeadTransition() { + group.transitionTo(Dead) + assertState(group, Dead) + } + + @Test def testAwaitingSyncToPreparingRebalanceTransition() { group.transitionTo(PreparingRebalance) group.transitionTo(AwaitingSync) @@ -79,6 +88,21 @@ class GroupMetadataTest extends JUnitSuite { } @Test + def testPreparingRebalanceToEmptyTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Empty) + assertState(group, Empty) + } + + @Test + def testEmptyToDeadTransition() { + group.transitionTo(PreparingRebalance) + group.transitionTo(Empty) + group.transitionTo(Dead) + assertState(group, Dead) + } + + @Test def testAwaitingSyncToStableTransition() { group.transitionTo(PreparingRebalance) group.transitionTo(AwaitingSync) @@ -115,11 +139,11 @@ class GroupMetadataTest extends JUnitSuite { group.transitionTo(AwaitingSync) } - @Test(expected = classOf[IllegalStateException]) def testDeadToDeadIllegalTransition() { group.transitionTo(PreparingRebalance) group.transitionTo(Dead) group.transitionTo(Dead) + assertState(group, Dead) } @Test(expected = classOf[IllegalStateException]) @@ -145,6 +169,7 @@ class GroupMetadataTest extends JUnitSuite { @Test def testSelectProtocol() { + val protocolType = "consumer" val groupId = "groupId" val clientId = "clientId" val clientHost = "clientHost" @@ -152,14 +177,14 @@ class GroupMetadataTest extends JUnitSuite { val memberId = "memberId" val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, - List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))) + protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))) group.add(memberId, member) assertEquals("range", group.selectProtocol) val otherMemberId = "otherMemberId" val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs, - List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte]))) + protocolType, List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte]))) group.add(otherMemberId, otherMember) // now could be either range or robin since there is no majority preference @@ -167,7 +192,7 @@ class GroupMetadataTest extends JUnitSuite { val lastMemberId = "lastMemberId" val lastMember = new MemberMetadata(lastMemberId, groupId, clientId, clientHost, sessionTimeoutMs, - List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte]))) + protocolType, List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte]))) group.add(lastMemberId, lastMember) // now we should prefer 'roundrobin' @@ -182,6 +207,7 @@ class GroupMetadataTest extends JUnitSuite { @Test def testSelectProtocolChoosesCompatibleProtocol() { + val protocolType = "consumer" val groupId = "groupId" val clientId = "clientId" val clientHost = "clientHost" @@ -189,11 +215,11 @@ class GroupMetadataTest extends JUnitSuite { val memberId = "memberId" val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, - List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))) + protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))) val otherMemberId = "otherMemberId" val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs, - List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte]))) + protocolType, List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte]))) group.add(memberId, member) group.add(otherMemberId, otherMember) @@ -202,6 +228,7 @@ class GroupMetadataTest extends JUnitSuite { @Test def testSupportsProtocols() { + val protocolType = "consumer" val groupId = "groupId" val clientId = "clientId" val clientHost = "clientHost" @@ -212,7 +239,7 @@ class GroupMetadataTest extends JUnitSuite { val memberId = "memberId" val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, - List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))) + protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))) group.add(memberId, member) assertTrue(group.supportsProtocols(Set("roundrobin", "foo"))) @@ -221,7 +248,7 @@ class GroupMetadataTest extends JUnitSuite { val otherMemberId = "otherMemberId" val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, sessionTimeoutMs, - List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte]))) + protocolType, List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte]))) group.add(otherMemberId, otherMember) @@ -229,6 +256,116 @@ class GroupMetadataTest extends JUnitSuite { assertFalse(group.supportsProtocols(Set("range", "foo"))) } + @Test + def testInitNextGeneration() { + val protocolType = "consumer" + val groupId = "groupId" + val clientId = "clientId" + val clientHost = "clientHost" + val sessionTimeoutMs = 10000 + val memberId = "memberId" + + val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, + protocolType, List(("roundrobin", Array.empty[Byte]))) + + group.transitionTo(PreparingRebalance) + member.awaitingJoinCallback = (result) => {} + group.add(memberId, member) + + assertEquals(0, group.generationId) + assertNull(group.protocol) + + group.initNextGeneration() + + assertEquals(1, group.generationId) + assertEquals("roundrobin", group.protocol) + } + + @Test + def testInitNextGenerationEmptyGroup() { + assertEquals(Empty, group.currentState) + assertEquals(0, group.generationId) + assertNull(group.protocol) + + group.transitionTo(PreparingRebalance) + group.initNextGeneration() + + assertEquals(1, group.generationId) + assertNull(group.protocol) + } + + @Test + def testOffsetCommit(): Unit = { + val partition = new TopicPartition("foo", 0) + val offset = OffsetAndMetadata(37) + + group.prepareOffsetCommit(Map(partition -> offset)) + assertTrue(group.hasOffsets) + assertEquals(None, group.offset(partition)) + + group.completePendingOffsetWrite(partition, offset) + assertTrue(group.hasOffsets) + assertEquals(Some(offset), group.offset(partition)) + } + + @Test + def testOffsetCommitFailure(): Unit = { + val partition = new TopicPartition("foo", 0) + val offset = OffsetAndMetadata(37) + + group.prepareOffsetCommit(Map(partition -> offset)) + assertTrue(group.hasOffsets) + assertEquals(None, group.offset(partition)) + + group.failPendingOffsetWrite(partition, offset) + assertFalse(group.hasOffsets) + assertEquals(None, group.offset(partition)) + } + + @Test + def testOffsetCommitFailureWithAnotherPending(): Unit = { + val partition = new TopicPartition("foo", 0) + val firstOffset = OffsetAndMetadata(37) + val secondOffset = OffsetAndMetadata(57) + + group.prepareOffsetCommit(Map(partition -> firstOffset)) + assertTrue(group.hasOffsets) + assertEquals(None, group.offset(partition)) + + group.prepareOffsetCommit(Map(partition -> secondOffset)) + assertTrue(group.hasOffsets) + + group.failPendingOffsetWrite(partition, firstOffset) + assertTrue(group.hasOffsets) + assertEquals(None, group.offset(partition)) + + group.completePendingOffsetWrite(partition, secondOffset) + assertTrue(group.hasOffsets) + assertEquals(Some(secondOffset), group.offset(partition)) + } + + @Test + def testOffsetCommitWithAnotherPending(): Unit = { + val partition = new TopicPartition("foo", 0) + val firstOffset = OffsetAndMetadata(37) + val secondOffset = OffsetAndMetadata(57) + + group.prepareOffsetCommit(Map(partition -> firstOffset)) + assertTrue(group.hasOffsets) + assertEquals(None, group.offset(partition)) + + group.prepareOffsetCommit(Map(partition -> secondOffset)) + assertTrue(group.hasOffsets) + + group.completePendingOffsetWrite(partition, firstOffset) + assertTrue(group.hasOffsets) + assertEquals(Some(firstOffset), group.offset(partition)) + + group.completePendingOffsetWrite(partition, secondOffset) + assertTrue(group.hasOffsets) + assertEquals(Some(secondOffset), group.offset(partition)) + } + private def assertState(group: GroupMetadata, targetState: GroupState) { val states: Set[GroupState] = Set(Stable, PreparingRebalance, AwaitingSync, Dead) val otherStates = states - targetState http://git-wip-us.apache.org/repos/asf/kafka/blob/8c551675/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala index 88eb9ae..0688424 100644 --- a/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala @@ -27,6 +27,7 @@ class MemberMetadataTest extends JUnitSuite { val clientId = "clientId" val clientHost = "clientHost" val memberId = "memberId" + val protocolType = "consumer" val sessionTimeoutMs = 10000 @@ -34,7 +35,7 @@ class MemberMetadataTest extends JUnitSuite { def testMatchesSupportedProtocols { val protocols = List(("range", Array.empty[Byte])) - val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols) + val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols) assertTrue(member.matches(protocols)) assertFalse(member.matches(List(("range", Array[Byte](0))))) assertFalse(member.matches(List(("roundrobin", Array.empty[Byte])))) @@ -45,7 +46,7 @@ class MemberMetadataTest extends JUnitSuite { def testVoteForPreferredProtocol { val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])) - val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols) + val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols) assertEquals("range", member.vote(Set("range", "roundrobin"))) assertEquals("roundrobin", member.vote(Set("blah", "roundrobin"))) } @@ -54,7 +55,7 @@ class MemberMetadataTest extends JUnitSuite { def testMetadata { val protocols = List(("range", Array[Byte](0)), ("roundrobin", Array[Byte](1))) - val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols) + val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols) assertTrue(util.Arrays.equals(Array[Byte](0), member.metadata("range"))) assertTrue(util.Arrays.equals(Array[Byte](1), member.metadata("roundrobin"))) } @@ -63,7 +64,7 @@ class MemberMetadataTest extends JUnitSuite { def testMetadataRaisesOnUnsupportedProtocol { val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])) - val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols) + val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols) member.metadata("blah") fail() } @@ -72,7 +73,7 @@ class MemberMetadataTest extends JUnitSuite { def testVoteRaisesOnNoSupportedProtocols { val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])) - val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocols) + val member = new MemberMetadata(memberId, groupId, clientId, clientHost, sessionTimeoutMs, protocolType, protocols) member.vote(Set("blah")) fail() }
