Repository: kafka Updated Branches: refs/heads/trunk 88200938f -> 38f6cae9e
http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala new file mode 100644 index 0000000..358e12c --- /dev/null +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -0,0 +1,1492 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator.group + +import kafka.common.OffsetAndMetadata +import kafka.server.{DelayedOperationPurgatory, KafkaConfig, ReplicaManager} +import kafka.utils._ +import kafka.utils.timer.MockTimer +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.{MemoryRecords, RecordBatch} +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse +import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse, TransactionResult} +import org.easymock.{Capture, EasyMock, IAnswer} +import java.util.concurrent.TimeUnit + +import org.apache.kafka.common.internals.Topic +import org.junit.Assert._ +import org.junit.{After, Assert, Before, Test} +import org.scalatest.junit.JUnitSuite + +import scala.collection._ +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future, Promise, TimeoutException} + +class GroupCoordinatorTest extends JUnitSuite { + type JoinGroupCallback = JoinGroupResult => Unit + type SyncGroupCallbackParams = (Array[Byte], Errors) + type SyncGroupCallback = (Array[Byte], Errors) => Unit + type HeartbeatCallbackParams = Errors + type HeartbeatCallback = Errors => Unit + type CommitOffsetCallbackParams = Map[TopicPartition, Errors] + type CommitOffsetCallback = Map[TopicPartition, Errors] => Unit + type LeaveGroupCallbackParams = Errors + type LeaveGroupCallback = Errors => Unit + + val ClientId = "consumer-test" + val ClientHost = "localhost" + val ConsumerMinSessionTimeout = 10 + val ConsumerMaxSessionTimeout = 1000 + val DefaultRebalanceTimeout = 500 + val DefaultSessionTimeout = 500 + val GroupInitialRebalanceDelay = 50 + var timer: MockTimer = null + var groupCoordinator: GroupCoordinator = null + var replicaManager: ReplicaManager = null + var scheduler: KafkaScheduler = null + var zkUtils: ZkUtils = null + + private val groupId = "groupId" + private val protocolType = "consumer" + private val memberId = "memberId" + private val metadata = Array[Byte]() + private val protocols = List(("range", metadata)) + private var groupPartitionId: Int = -1 + + // we use this string value since its hashcode % #.partitions is different + private val otherGroupId = "otherGroup" + + @Before + def setUp() { + val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") + props.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString) + props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString) + props.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, GroupInitialRebalanceDelay.toString) + // make two partitions of the group topic to make sure some partitions are not owned by the coordinator + val ret = mutable.Map[String, Map[Int, Seq[Int]]]() + ret += (Topic.GROUP_METADATA_TOPIC_NAME -> Map(0 -> Seq(1), 1 -> Seq(1))) + + replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager]) + + zkUtils = EasyMock.createNiceMock(classOf[ZkUtils]) + // make two partitions of the group topic to make sure some partitions are not owned by the coordinator + EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2)) + EasyMock.replay(zkUtils) + + timer = new MockTimer + + val config = KafkaConfig.fromProps(props) + + val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer, config.brokerId, reaperEnabled = false) + val joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId, reaperEnabled = false) + + groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time) + groupCoordinator.startup(false) + + // add the partition into the owned partition list + groupPartitionId = groupCoordinator.partitionFor(groupId) + groupCoordinator.groupManager.addPartitionOwnership(groupPartitionId) + } + + @After + def tearDown() { + EasyMock.reset(replicaManager) + if (groupCoordinator != null) + groupCoordinator.shutdown() + } + + @Test + def testOffsetsRetentionMsIntegerOverflow() { + val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "") + props.setProperty(KafkaConfig.OffsetsRetentionMinutesProp, Integer.MAX_VALUE.toString) + val config = KafkaConfig.fromProps(props) + val offsetConfig = GroupCoordinator.offsetConfig(config) + assertEquals(offsetConfig.offsetsRetentionMs, Integer.MAX_VALUE * 60L * 1000L) + } + + @Test + def testJoinGroupWrongCoordinator() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(otherGroupId, memberId, protocolType, protocols) + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NOT_COORDINATOR, joinGroupError) + } + + @Test + def testJoinGroupSessionTimeoutTooSmall() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = ConsumerMinSessionTimeout - 1) + val joinGroupError = joinGroupResult.error + assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError) + } + + @Test + def testJoinGroupSessionTimeoutTooLarge() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = ConsumerMaxSessionTimeout + 1) + val joinGroupError = joinGroupResult.error + assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError) + } + + @Test + def testJoinGroupUnknownConsumerNewGroup() { + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + val joinGroupError = joinGroupResult.error + assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupError) + } + + @Test + def testInvalidGroupId() { + val groupId = "" + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + assertEquals(Errors.INVALID_GROUP_ID, joinGroupResult.error) + } + + @Test + def testValidJoinGroup() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NONE, joinGroupError) + } + + @Test + def testJoinGroupInconsistentProtocolType() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + assertEquals(Errors.NONE, joinGroupResult.error) + + EasyMock.reset(replicaManager) + val otherJoinGroupResult = await(sendJoinGroup(groupId, otherMemberId, "connect", protocols), 1) + assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, otherJoinGroupResult.error) + } + + @Test + def testJoinGroupInconsistentGroupProtocol() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupFuture = sendJoinGroup(groupId, memberId, protocolType, List(("range", metadata))) + + EasyMock.reset(replicaManager) + val otherJoinGroupResult = joinGroup(groupId, otherMemberId, protocolType, List(("roundrobin", metadata))) + + val joinGroupResult = await(joinGroupFuture, 1) + assertEquals(Errors.NONE, joinGroupResult.error) + assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, otherJoinGroupResult.error) + } + + @Test + def testJoinGroupUnknownConsumerExistingGroup() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val otherMemberId = "memberId" + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + assertEquals(Errors.NONE, joinGroupResult.error) + + EasyMock.reset(replicaManager) + val otherJoinGroupResult = await(sendJoinGroup(groupId, otherMemberId, protocolType, protocols), 1) + assertEquals(Errors.UNKNOWN_MEMBER_ID, otherJoinGroupResult.error) + } + + @Test + def testHeartbeatWrongCoordinator() { + + val heartbeatResult = heartbeat(otherGroupId, memberId, -1) + assertEquals(Errors.NOT_COORDINATOR, heartbeatResult) + } + + @Test + def testHeartbeatUnknownGroup() { + + val heartbeatResult = heartbeat(groupId, memberId, -1) + assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult) + } + + @Test + def testHeartbeatUnknownConsumerExistingGroup() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val otherMemberId = "memberId" + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + val assignedMemberId = joinGroupResult.memberId + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NONE, joinGroupError) + + EasyMock.reset(replicaManager) + val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]())) + val syncGroupError = syncGroupResult._2 + assertEquals(Errors.NONE, syncGroupError) + + EasyMock.reset(replicaManager) + val heartbeatResult = heartbeat(groupId, otherMemberId, 1) + assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult) + } + + @Test + def testHeartbeatRebalanceInProgress() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + val assignedMemberId = joinGroupResult.memberId + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NONE, joinGroupError) + + EasyMock.reset(replicaManager) + val heartbeatResult = heartbeat(groupId, assignedMemberId, 2) + assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult) + } + + @Test + def testHeartbeatIllegalGeneration() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + val assignedMemberId = joinGroupResult.memberId + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NONE, joinGroupError) + + EasyMock.reset(replicaManager) + val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]())) + val syncGroupError = syncGroupResult._2 + assertEquals(Errors.NONE, syncGroupError) + + EasyMock.reset(replicaManager) + val heartbeatResult = heartbeat(groupId, assignedMemberId, 2) + assertEquals(Errors.ILLEGAL_GENERATION, heartbeatResult) + } + + @Test + def testValidHeartbeat() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + val assignedConsumerId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NONE, joinGroupError) + + EasyMock.reset(replicaManager) + val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]())) + val syncGroupError = syncGroupResult._2 + assertEquals(Errors.NONE, syncGroupError) + + EasyMock.reset(replicaManager) + val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1) + assertEquals(Errors.NONE, heartbeatResult) + } + + @Test + def testSessionTimeout() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + val assignedConsumerId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NONE, joinGroupError) + + EasyMock.reset(replicaManager) + val (_, syncGroupError) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]())) + assertEquals(Errors.NONE, syncGroupError) + + EasyMock.reset(replicaManager) + EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andReturn(None) + EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes() + EasyMock.replay(replicaManager) + + timer.advanceClock(DefaultSessionTimeout + 100) + + EasyMock.reset(replicaManager) + val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1) + assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult) + } + + @Test + def testHeartbeatMaintainsSession() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val sessionTimeout = 1000 + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, + rebalanceTimeout = sessionTimeout, sessionTimeout = sessionTimeout) + val assignedConsumerId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NONE, joinGroupError) + + EasyMock.reset(replicaManager) + val (_, syncGroupError) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]())) + assertEquals(Errors.NONE, syncGroupError) + + timer.advanceClock(sessionTimeout / 2) + + EasyMock.reset(replicaManager) + var heartbeatResult = heartbeat(groupId, assignedConsumerId, 1) + assertEquals(Errors.NONE, heartbeatResult) + + timer.advanceClock(sessionTimeout / 2 + 100) + + EasyMock.reset(replicaManager) + heartbeatResult = heartbeat(groupId, assignedConsumerId, 1) + assertEquals(Errors.NONE, heartbeatResult) + } + + @Test + def testCommitMaintainsSession() { + val sessionTimeout = 1000 + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val tp = new TopicPartition("topic", 0) + val offset = OffsetAndMetadata(0) + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, + rebalanceTimeout = sessionTimeout, sessionTimeout = sessionTimeout) + val assignedConsumerId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NONE, joinGroupError) + + EasyMock.reset(replicaManager) + val (_, syncGroupError) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]())) + assertEquals(Errors.NONE, syncGroupError) + + timer.advanceClock(sessionTimeout / 2) + + EasyMock.reset(replicaManager) + val commitOffsetResult = commitOffsets(groupId, assignedConsumerId, generationId, immutable.Map(tp -> offset)) + assertEquals(Errors.NONE, commitOffsetResult(tp)) + + timer.advanceClock(sessionTimeout / 2 + 100) + + EasyMock.reset(replicaManager) + val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1) + assertEquals(Errors.NONE, heartbeatResult) + } + + @Test + def testSessionTimeoutDuringRebalance() { + // create a group with a single member + val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, + rebalanceTimeout = 2000, sessionTimeout = 1000) + val firstMemberId = firstJoinResult.memberId + val firstGenerationId = firstJoinResult.generationId + assertEquals(firstMemberId, firstJoinResult.leaderId) + assertEquals(Errors.NONE, firstJoinResult.error) + + EasyMock.reset(replicaManager) + val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]())) + assertEquals(Errors.NONE, firstSyncResult._2) + + // now have a new member join to trigger a rebalance + EasyMock.reset(replicaManager) + val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + + timer.advanceClock(500) + + EasyMock.reset(replicaManager) + var heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) + assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult) + + // letting the session expire should make the member fall out of the group + timer.advanceClock(1100) + + EasyMock.reset(replicaManager) + heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) + assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult) + + // and the rebalance should complete with only the new member + val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) + assertEquals(Errors.NONE, otherJoinResult.error) + } + + @Test + def testRebalanceCompletesBeforeMemberJoins() { + // create a group with a single member + val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, + rebalanceTimeout = 1200, sessionTimeout = 1000) + val firstMemberId = firstJoinResult.memberId + val firstGenerationId = firstJoinResult.generationId + assertEquals(firstMemberId, firstJoinResult.leaderId) + assertEquals(Errors.NONE, firstJoinResult.error) + + EasyMock.reset(replicaManager) + val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]())) + assertEquals(Errors.NONE, firstSyncResult._2) + + // now have a new member join to trigger a rebalance + EasyMock.reset(replicaManager) + val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + + // send a couple heartbeats to keep the member alive while the rebalance finishes + timer.advanceClock(500) + EasyMock.reset(replicaManager) + var heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) + assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult) + + timer.advanceClock(500) + EasyMock.reset(replicaManager) + heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId) + assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult) + + // now timeout the rebalance, which should kick the unjoined member out of the group + // and let the rebalance finish with only the new member + timer.advanceClock(500) + val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) + assertEquals(Errors.NONE, otherJoinResult.error) + } + + @Test + def testSyncGroupEmptyAssignment() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + val assignedConsumerId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NONE, joinGroupError) + + EasyMock.reset(replicaManager) + val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map()) + val syncGroupError = syncGroupResult._2 + assertEquals(Errors.NONE, syncGroupError) + assertTrue(syncGroupResult._1.isEmpty) + + EasyMock.reset(replicaManager) + val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1) + assertEquals(Errors.NONE, heartbeatResult) + } + + @Test + def testSyncGroupNotCoordinator() { + val generation = 1 + + val syncGroupResult = syncGroupFollower(otherGroupId, generation, memberId) + assertEquals(Errors.NOT_COORDINATOR, syncGroupResult._2) + } + + @Test + def testSyncGroupFromUnknownGroup() { + val generation = 1 + + val syncGroupResult = syncGroupFollower(groupId, generation, memberId) + assertEquals(Errors.UNKNOWN_MEMBER_ID, syncGroupResult._2) + } + + @Test + def testSyncGroupFromUnknownMember() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + val assignedConsumerId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + assertEquals(Errors.NONE, joinGroupResult.error) + + EasyMock.reset(replicaManager) + val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]())) + val syncGroupError = syncGroupResult._2 + assertEquals(Errors.NONE, syncGroupError) + + EasyMock.reset(replicaManager) + val unknownMemberId = "blah" + val unknownMemberSyncResult = syncGroupFollower(groupId, generationId, unknownMemberId) + assertEquals(Errors.UNKNOWN_MEMBER_ID, unknownMemberSyncResult._2) + } + + @Test + def testSyncGroupFromIllegalGeneration() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + val assignedConsumerId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + assertEquals(Errors.NONE, joinGroupResult.error) + + EasyMock.reset(replicaManager) + // send the sync group with an invalid generation + val syncGroupResult = syncGroupLeader(groupId, generationId+1, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]())) + assertEquals(Errors.ILLEGAL_GENERATION, syncGroupResult._2) + } + + @Test + def testJoinGroupFromUnchangedFollowerDoesNotRebalance() { + // to get a group of two members: + // 1. join and sync with a single member (because we can't immediately join with two members) + // 2. join and sync with the first member and a new member + + val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + val firstMemberId = firstJoinResult.memberId + val firstGenerationId = firstJoinResult.generationId + assertEquals(firstMemberId, firstJoinResult.leaderId) + assertEquals(Errors.NONE, firstJoinResult.error) + + EasyMock.reset(replicaManager) + val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]())) + assertEquals(Errors.NONE, firstSyncResult._2) + + EasyMock.reset(replicaManager) + val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + + EasyMock.reset(replicaManager) + val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols) + + val joinResult = await(joinFuture, DefaultSessionTimeout+100) + val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) + assertEquals(Errors.NONE, joinResult.error) + assertEquals(Errors.NONE, otherJoinResult.error) + assertTrue(joinResult.generationId == otherJoinResult.generationId) + + assertEquals(firstMemberId, joinResult.leaderId) + assertEquals(firstMemberId, otherJoinResult.leaderId) + + val nextGenerationId = joinResult.generationId + + // this shouldn't cause a rebalance since protocol information hasn't changed + EasyMock.reset(replicaManager) + val followerJoinResult = await(sendJoinGroup(groupId, otherJoinResult.memberId, protocolType, protocols), 1) + + assertEquals(Errors.NONE, followerJoinResult.error) + assertEquals(nextGenerationId, followerJoinResult.generationId) + } + + @Test + def testJoinGroupFromUnchangedLeaderShouldRebalance() { + val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + val firstMemberId = firstJoinResult.memberId + val firstGenerationId = firstJoinResult.generationId + assertEquals(firstMemberId, firstJoinResult.leaderId) + assertEquals(Errors.NONE, firstJoinResult.error) + + EasyMock.reset(replicaManager) + val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]())) + assertEquals(Errors.NONE, firstSyncResult._2) + + // join groups from the leader should force the group to rebalance, which allows the + // leader to push new assignments when local metadata changes + + EasyMock.reset(replicaManager) + val secondJoinResult = await(sendJoinGroup(groupId, firstMemberId, protocolType, protocols), 1) + + assertEquals(Errors.NONE, secondJoinResult.error) + assertNotEquals(firstGenerationId, secondJoinResult.generationId) + } + + @Test + def testLeaderFailureInSyncGroup() { + // to get a group of two members: + // 1. join and sync with a single member (because we can't immediately join with two members) + // 2. join and sync with the first member and a new member + + val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + val firstMemberId = firstJoinResult.memberId + val firstGenerationId = firstJoinResult.generationId + assertEquals(firstMemberId, firstJoinResult.leaderId) + assertEquals(Errors.NONE, firstJoinResult.error) + + EasyMock.reset(replicaManager) + val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]())) + assertEquals(Errors.NONE, firstSyncResult._2) + + EasyMock.reset(replicaManager) + val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + + EasyMock.reset(replicaManager) + val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols) + + val joinResult = await(joinFuture, DefaultSessionTimeout+100) + val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) + assertEquals(Errors.NONE, joinResult.error) + assertEquals(Errors.NONE, otherJoinResult.error) + assertTrue(joinResult.generationId == otherJoinResult.generationId) + + assertEquals(firstMemberId, joinResult.leaderId) + assertEquals(firstMemberId, otherJoinResult.leaderId) + + val nextGenerationId = joinResult.generationId + + // with no leader SyncGroup, the follower's request should failure with an error indicating + // that it should rejoin + EasyMock.reset(replicaManager) + val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId) + + timer.advanceClock(DefaultSessionTimeout + 100) + + val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100) + assertEquals(Errors.REBALANCE_IN_PROGRESS, followerSyncResult._2) + } + + @Test + def testSyncGroupFollowerAfterLeader() { + // to get a group of two members: + // 1. join and sync with a single member (because we can't immediately join with two members) + // 2. join and sync with the first member and a new member + + val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + val firstMemberId = firstJoinResult.memberId + val firstGenerationId = firstJoinResult.generationId + assertEquals(firstMemberId, firstJoinResult.leaderId) + assertEquals(Errors.NONE, firstJoinResult.error) + + EasyMock.reset(replicaManager) + val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]())) + assertEquals(Errors.NONE, firstSyncResult._2) + + EasyMock.reset(replicaManager) + val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + + EasyMock.reset(replicaManager) + val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols) + + val joinResult = await(joinFuture, DefaultSessionTimeout+100) + val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) + assertEquals(Errors.NONE, joinResult.error) + assertEquals(Errors.NONE, otherJoinResult.error) + assertTrue(joinResult.generationId == otherJoinResult.generationId) + + assertEquals(firstMemberId, joinResult.leaderId) + assertEquals(firstMemberId, otherJoinResult.leaderId) + + val nextGenerationId = joinResult.generationId + val leaderId = firstMemberId + val leaderAssignment = Array[Byte](0) + val followerId = otherJoinResult.memberId + val followerAssignment = Array[Byte](1) + + EasyMock.reset(replicaManager) + val leaderSyncResult = syncGroupLeader(groupId, nextGenerationId, leaderId, + Map(leaderId -> leaderAssignment, followerId -> followerAssignment)) + assertEquals(Errors.NONE, leaderSyncResult._2) + assertEquals(leaderAssignment, leaderSyncResult._1) + + EasyMock.reset(replicaManager) + val followerSyncResult = syncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId) + assertEquals(Errors.NONE, followerSyncResult._2) + assertEquals(followerAssignment, followerSyncResult._1) + } + + @Test + def testSyncGroupLeaderAfterFollower() { + // to get a group of two members: + // 1. join and sync with a single member (because we can't immediately join with two members) + // 2. join and sync with the first member and a new member + + val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + val firstMemberId = joinGroupResult.memberId + val firstGenerationId = joinGroupResult.generationId + assertEquals(firstMemberId, joinGroupResult.leaderId) + assertEquals(Errors.NONE, joinGroupResult.error) + + EasyMock.reset(replicaManager) + val syncGroupResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]())) + val syncGroupError = syncGroupResult._2 + assertEquals(Errors.NONE, syncGroupError) + + EasyMock.reset(replicaManager) + val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + + EasyMock.reset(replicaManager) + val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols) + + val joinResult = await(joinFuture, DefaultSessionTimeout+100) + val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100) + assertEquals(Errors.NONE, joinResult.error) + assertEquals(Errors.NONE, otherJoinResult.error) + assertTrue(joinResult.generationId == otherJoinResult.generationId) + + val nextGenerationId = joinResult.generationId + val leaderId = joinResult.leaderId + val leaderAssignment = Array[Byte](0) + val followerId = otherJoinResult.memberId + val followerAssignment = Array[Byte](1) + + assertEquals(firstMemberId, joinResult.leaderId) + assertEquals(firstMemberId, otherJoinResult.leaderId) + + EasyMock.reset(replicaManager) + val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, followerId) + + EasyMock.reset(replicaManager) + val leaderSyncResult = syncGroupLeader(groupId, nextGenerationId, leaderId, + Map(leaderId -> leaderAssignment, followerId -> followerAssignment)) + assertEquals(Errors.NONE, leaderSyncResult._2) + assertEquals(leaderAssignment, leaderSyncResult._1) + + val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100) + assertEquals(Errors.NONE, followerSyncResult._2) + assertEquals(followerAssignment, followerSyncResult._1) + } + + @Test + def testCommitOffsetFromUnknownGroup() { + val generationId = 1 + val tp = new TopicPartition("topic", 0) + val offset = OffsetAndMetadata(0) + + val commitOffsetResult = commitOffsets(groupId, memberId, generationId, immutable.Map(tp -> offset)) + assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp)) + } + + @Test + def testCommitOffsetWithDefaultGeneration() { + val tp = new TopicPartition("topic", 0) + val offset = OffsetAndMetadata(0) + + val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID, + OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset)) + assertEquals(Errors.NONE, commitOffsetResult(tp)) + } + + @Test + def testFetchOffsets() { + val tp = new TopicPartition("topic", 0) + val offset = OffsetAndMetadata(0) + + val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID, + OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset)) + assertEquals(Errors.NONE, commitOffsetResult(tp)) + + val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) + assertEquals(Errors.NONE, error) + assertEquals(Some(0), partitionData.get(tp).map(_.offset)) + } + + @Test + def testBasicFetchTxnOffsets() { + val tp = new TopicPartition("topic", 0) + val offset = OffsetAndMetadata(0) + val producerId = 1000L + val producerEpoch : Short = 2 + + val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(tp -> offset)) + assertEquals(Errors.NONE, commitOffsetResult(tp)) + + val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) + + // Validate that the offset isn't materialjzed yet. + assertEquals(Errors.NONE, error) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tp).map(_.offset)) + + val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) + + // Send commit marker. + groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT) + + // Validate that committed offset is materialized. + val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) + assertEquals(Errors.NONE, secondReqError) + assertEquals(Some(0), secondReqPartitionData.get(tp).map(_.offset)) + } + + @Test + def testFetchTxnOffsetsWithAbort() { + val tp = new TopicPartition("topic", 0) + val offset = OffsetAndMetadata(0) + val producerId = 1000L + val producerEpoch : Short = 2 + + val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(tp -> offset)) + assertEquals(Errors.NONE, commitOffsetResult(tp)) + + val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) + assertEquals(Errors.NONE, error) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tp).map(_.offset)) + + val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) + + // Validate that the pending commit is discarded. + groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT) + + val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) + assertEquals(Errors.NONE, secondReqError) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), secondReqPartitionData.get(tp).map(_.offset)) + } + + @Test + def testFetchTxnOffsetsIgnoreSpuriousCommit() { + val tp = new TopicPartition("topic", 0) + val offset = OffsetAndMetadata(0) + val producerId = 1000L + val producerEpoch : Short = 2 + + val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(tp -> offset)) + assertEquals(Errors.NONE, commitOffsetResult(tp)) + + val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) + assertEquals(Errors.NONE, error) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tp).map(_.offset)) + + val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) + groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT) + + val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) + assertEquals(Errors.NONE, secondReqError) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), secondReqPartitionData.get(tp).map(_.offset)) + + // Ignore spurious commit. + groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT) + + val (thirdReqError, thirdReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) + assertEquals(Errors.NONE, secondReqError) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), thirdReqPartitionData.get(tp).map(_.offset)) + } + + @Test + def testFetchTxnOffsetsOneProducerMultipleGroups() { + // One producer, two groups located on separate offsets topic partitions. + // Both group have pending offset commits. + // Marker for only one partition is received. That commit should be materialized while the other should not. + + val partitions = List(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)) + val offsets = List(OffsetAndMetadata(10), OffsetAndMetadata(15)) + val producerId = 1000L + val producerEpoch: Short = 3 + + val groupIds = List(groupId, otherGroupId) + val offsetTopicPartitions = List(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)), + new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(otherGroupId))) + + groupCoordinator.groupManager.addPartitionOwnership(offsetTopicPartitions(1).partition) + val errors = mutable.ArrayBuffer[Errors]() + val partitionData = mutable.ArrayBuffer[Map[TopicPartition, OffsetFetchResponse.PartitionData]]() + + val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]() + + // Ensure that the two groups map to different partitions. + assertNotEquals(offsetTopicPartitions(0), offsetTopicPartitions(1)) + + commitOffsetResults.append(commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(partitions(0) -> offsets(0)))) + assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0))) + commitOffsetResults.append(commitTransactionalOffsets(otherGroupId, producerId, producerEpoch, immutable.Map(partitions(1) -> offsets(1)))) + assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1))) + + // We got a commit for only one __consumer_offsets partition. We should only materialize it's group offsets. + groupCoordinator.handleTxnCompletion(producerId, List(offsetTopicPartitions(0)), TransactionResult.COMMIT) + groupCoordinator.handleFetchOffsets(groupIds(0), Some(partitions)) match { + case (error, partData) => + errors.append(error) + partitionData.append(partData) + case _ => + } + + groupCoordinator.handleFetchOffsets(groupIds(1), Some(partitions)) match { + case (error, partData) => + errors.append(error) + partitionData.append(partData) + case _ => + } + + assertEquals(2, errors.size) + assertEquals(Errors.NONE, errors(0)) + assertEquals(Errors.NONE, errors(1)) + + // Exactly one offset commit should have been materialized. + assertEquals(Some(offsets(0).offset), partitionData(0).get(partitions(0)).map(_.offset)) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(0).get(partitions(1)).map(_.offset)) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(1).get(partitions(0)).map(_.offset)) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(1).get(partitions(1)).map(_.offset)) + + // Now we receive the other marker. + groupCoordinator.handleTxnCompletion(producerId, List(offsetTopicPartitions(1)), TransactionResult.COMMIT) + errors.clear() + partitionData.clear() + groupCoordinator.handleFetchOffsets(groupIds(0), Some(partitions)) match { + case (error, partData) => + errors.append(error) + partitionData.append(partData) + case _ => + } + + groupCoordinator.handleFetchOffsets(groupIds(1), Some(partitions)) match { + case (error, partData) => + errors.append(error) + partitionData.append(partData) + case _ => + } + // Two offsets should have been materialized + assertEquals(Some(offsets(0).offset), partitionData(0).get(partitions(0)).map(_.offset)) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(0).get(partitions(1)).map(_.offset)) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(1).get(partitions(0)).map(_.offset)) + assertEquals(Some(offsets(1).offset), partitionData(1).get(partitions(1)).map(_.offset)) + } + + @Test + def testFetchTxnOffsetsMultipleProducersOneGroup() { + // One group, two producers + // Different producers will commit offsets for different partitions. + // Each partition's offsets should be materialized when the corresponding producer's marker is received. + + val partitions = List(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)) + val offsets = List(OffsetAndMetadata(10), OffsetAndMetadata(15)) + val producerIds = List(1000L, 1005L) + val producerEpochs: Seq[Short] = List(3, 4) + + val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) + + val errors = mutable.ArrayBuffer[Errors]() + val partitionData = mutable.ArrayBuffer[Map[TopicPartition, OffsetFetchResponse.PartitionData]]() + + val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]() + + // producer0 commits the offsets for partition0 + commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(0), producerEpochs(0), immutable.Map(partitions(0) -> offsets(0)))) + assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0))) + + // producer1 commits the offsets for partition1 + commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(1), producerEpochs(1), immutable.Map(partitions(1) -> offsets(1)))) + assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1))) + + // producer0 commits its transaction. + groupCoordinator.handleTxnCompletion(producerIds(0), List(offsetTopicPartition), TransactionResult.COMMIT) + groupCoordinator.handleFetchOffsets(groupId, Some(partitions)) match { + case (error, partData) => + errors.append(error) + partitionData.append(partData) + case _ => + } + + assertEquals(Errors.NONE, errors(0)) + + // We should only see the offset commit for producer0 + assertEquals(Some(offsets(0).offset), partitionData(0).get(partitions(0)).map(_.offset)) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(0).get(partitions(1)).map(_.offset)) + + // producer1 now commits its transaction. + groupCoordinator.handleTxnCompletion(producerIds(1), List(offsetTopicPartition), TransactionResult.COMMIT) + + groupCoordinator.handleFetchOffsets(groupId, Some(partitions)) match { + case (error, partData) => + errors.append(error) + partitionData.append(partData) + case _ => + } + + assertEquals(Errors.NONE, errors(1)) + + // We should now see the offset commits for both producers. + assertEquals(Some(offsets(0).offset), partitionData(1).get(partitions(0)).map(_.offset)) + assertEquals(Some(offsets(1).offset), partitionData(1).get(partitions(1)).map(_.offset)) + } + + @Test + def testFetchOffsetForUnknownPartition(): Unit = { + val tp = new TopicPartition("topic", 0) + val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) + assertEquals(Errors.NONE, error) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tp).map(_.offset)) + } + + @Test + def testFetchOffsetNotCoordinatorForGroup(): Unit = { + val tp = new TopicPartition("topic", 0) + val (error, partitionData) = groupCoordinator.handleFetchOffsets(otherGroupId, Some(Seq(tp))) + assertEquals(Errors.NOT_COORDINATOR, error) + assertTrue(partitionData.isEmpty) + } + + @Test + def testFetchAllOffsets() { + val tp1 = new TopicPartition("topic", 0) + val tp2 = new TopicPartition("topic", 1) + val tp3 = new TopicPartition("other-topic", 0) + val offset1 = OffsetAndMetadata(15) + val offset2 = OffsetAndMetadata(16) + val offset3 = OffsetAndMetadata(17) + + assertEquals((Errors.NONE, Map.empty), groupCoordinator.handleFetchOffsets(groupId)) + + val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID, + OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp1 -> offset1, tp2 -> offset2, tp3 -> offset3)) + assertEquals(Errors.NONE, commitOffsetResult(tp1)) + assertEquals(Errors.NONE, commitOffsetResult(tp2)) + assertEquals(Errors.NONE, commitOffsetResult(tp3)) + + val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId) + assertEquals(Errors.NONE, error) + assertEquals(3, partitionData.size) + assertTrue(partitionData.forall(_._2.error == Errors.NONE)) + assertEquals(Some(offset1.offset), partitionData.get(tp1).map(_.offset)) + assertEquals(Some(offset2.offset), partitionData.get(tp2).map(_.offset)) + assertEquals(Some(offset3.offset), partitionData.get(tp3).map(_.offset)) + } + + @Test + def testCommitOffsetInAwaitingSync() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val tp = new TopicPartition("topic", 0) + val offset = OffsetAndMetadata(0) + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + val assignedMemberId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NONE, joinGroupError) + + EasyMock.reset(replicaManager) + val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, immutable.Map(tp -> offset)) + assertEquals(Errors.REBALANCE_IN_PROGRESS, commitOffsetResult(tp)) + } + + @Test + def testHeartbeatDuringRebalanceCausesRebalanceInProgress() { + // First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts) + val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + val assignedConsumerId = joinGroupResult.memberId + val initialGenerationId = joinGroupResult.generationId + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NONE, joinGroupError) + + // Then join with a new consumer to trigger a rebalance + EasyMock.reset(replicaManager) + sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + + // We should be in the middle of a rebalance, so the heartbeat should return rebalance in progress + EasyMock.reset(replicaManager) + val heartbeatResult = heartbeat(groupId, assignedConsumerId, initialGenerationId) + assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult) + } + + @Test + def testGenerationIdIncrementsOnRebalance() { + val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + val initialGenerationId = joinGroupResult.generationId + val joinGroupError = joinGroupResult.error + val memberId = joinGroupResult.memberId + assertEquals(1, initialGenerationId) + assertEquals(Errors.NONE, joinGroupError) + + EasyMock.reset(replicaManager) + val syncGroupResult = syncGroupLeader(groupId, initialGenerationId, memberId, Map(memberId -> Array[Byte]())) + val syncGroupError = syncGroupResult._2 + assertEquals(Errors.NONE, syncGroupError) + + EasyMock.reset(replicaManager) + val joinGroupFuture = sendJoinGroup(groupId, memberId, protocolType, protocols) + val otherJoinGroupResult = await(joinGroupFuture, 1) + + val nextGenerationId = otherJoinGroupResult.generationId + val otherJoinGroupError = otherJoinGroupResult.error + assertEquals(2, nextGenerationId) + assertEquals(Errors.NONE, otherJoinGroupError) + } + + @Test + def testLeaveGroupWrongCoordinator() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val leaveGroupResult = leaveGroup(otherGroupId, memberId) + assertEquals(Errors.NOT_COORDINATOR, leaveGroupResult) + } + + @Test + def testLeaveGroupUnknownGroup() { + + val leaveGroupResult = leaveGroup(groupId, memberId) + assertEquals(Errors.UNKNOWN_MEMBER_ID, leaveGroupResult) + } + + @Test + def testLeaveGroupUnknownConsumerExistingGroup() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val otherMemberId = "consumerId" + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NONE, joinGroupError) + + EasyMock.reset(replicaManager) + val leaveGroupResult = leaveGroup(groupId, otherMemberId) + assertEquals(Errors.UNKNOWN_MEMBER_ID, leaveGroupResult) + } + + @Test + def testValidLeaveGroup() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + val assignedMemberId = joinGroupResult.memberId + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NONE, joinGroupError) + + EasyMock.reset(replicaManager) + val leaveGroupResult = leaveGroup(groupId, assignedMemberId) + assertEquals(Errors.NONE, leaveGroupResult) + } + + @Test + def testListGroupsIncludesStableGroups() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + val assignedMemberId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + assertEquals(Errors.NONE, joinGroupResult.error) + + EasyMock.reset(replicaManager) + val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]())) + val syncGroupError = syncGroupResult._2 + assertEquals(Errors.NONE, syncGroupError) + + val (error, groups) = groupCoordinator.handleListGroups() + assertEquals(Errors.NONE, error) + assertEquals(1, groups.size) + assertEquals(GroupOverview("groupId", "consumer"), groups.head) + } + + @Test + def testListGroupsIncludesRebalancingGroups() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + assertEquals(Errors.NONE, joinGroupResult.error) + + val (error, groups) = groupCoordinator.handleListGroups() + assertEquals(Errors.NONE, error) + assertEquals(1, groups.size) + assertEquals(GroupOverview("groupId", "consumer"), groups.head) + } + + @Test + def testDescribeGroupWrongCoordinator() { + EasyMock.reset(replicaManager) + val (error, _) = groupCoordinator.handleDescribeGroup(otherGroupId) + assertEquals(Errors.NOT_COORDINATOR, error) + } + + @Test + def testDescribeGroupInactiveGroup() { + EasyMock.reset(replicaManager) + val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) + assertEquals(Errors.NONE, error) + assertEquals(GroupCoordinator.DeadGroup, summary) + } + + @Test + def testDescribeGroupStable() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + val assignedMemberId = joinGroupResult.memberId + val generationId = joinGroupResult.generationId + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NONE, joinGroupError) + + EasyMock.reset(replicaManager) + val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]())) + + val syncGroupError = syncGroupResult._2 + assertEquals(Errors.NONE, syncGroupError) + + EasyMock.reset(replicaManager) + val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) + assertEquals(Errors.NONE, error) + assertEquals(protocolType, summary.protocolType) + assertEquals("range", summary.protocol) + assertEquals(List(assignedMemberId), summary.members.map(_.memberId)) + } + + @Test + def testDescribeGroupRebalancing() { + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NONE, joinGroupError) + + EasyMock.reset(replicaManager) + val (error, summary) = groupCoordinator.handleDescribeGroup(groupId) + assertEquals(Errors.NONE, error) + assertEquals(protocolType, summary.protocolType) + assertEquals(GroupCoordinator.NoProtocol, summary.protocol) + assertEquals(AwaitingSync.toString, summary.state) + assertTrue(summary.members.map(_.memberId).contains(joinGroupResult.memberId)) + assertTrue(summary.members.forall(_.metadata.isEmpty)) + assertTrue(summary.members.forall(_.assignment.isEmpty)) + } + + @Test + def shouldDelayInitialRebalanceByGroupInitialRebalanceDelayOnEmptyGroup() { + val firstJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols) + timer.advanceClock(GroupInitialRebalanceDelay / 2) + verifyDelayedTaskNotCompleted(firstJoinFuture) + timer.advanceClock((GroupInitialRebalanceDelay / 2) + 1) + val joinGroupResult = await(firstJoinFuture, 1) + assertEquals(Errors.NONE, joinGroupResult.error) + } + + private def verifyDelayedTaskNotCompleted(firstJoinFuture: Future[JoinGroupResult]) = { + try { + await(firstJoinFuture, 1) + Assert.fail("should have timed out as rebalance delay not expired") + } catch { + case _: TimeoutException => // ok + } + } + + @Test + def shouldResetRebalanceDelayWhenNewMemberJoinsGroupInInitialRebalance() { + val rebalanceTimeout = GroupInitialRebalanceDelay * 3 + val firstMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout) + EasyMock.reset(replicaManager) + timer.advanceClock(GroupInitialRebalanceDelay - 1) + val secondMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout) + EasyMock.reset(replicaManager) + timer.advanceClock(2) + + // advance past initial rebalance delay and make sure that tasks + // haven't been completed + timer.advanceClock(GroupInitialRebalanceDelay / 2 + 1) + verifyDelayedTaskNotCompleted(firstMemberJoinFuture) + verifyDelayedTaskNotCompleted(secondMemberJoinFuture) + // advance clock beyond updated delay and make sure the + // tasks have completed + timer.advanceClock(GroupInitialRebalanceDelay / 2) + val firstResult = await(firstMemberJoinFuture, 1) + val secondResult = await(secondMemberJoinFuture, 1) + assertEquals(Errors.NONE, firstResult.error) + assertEquals(Errors.NONE, secondResult.error) + } + + @Test + def shouldDelayRebalanceUptoRebalanceTimeout() { + val rebalanceTimeout = GroupInitialRebalanceDelay * 2 + val firstMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout) + EasyMock.reset(replicaManager) + val secondMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout) + timer.advanceClock(GroupInitialRebalanceDelay + 1) + EasyMock.reset(replicaManager) + val thirdMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout) + timer.advanceClock(GroupInitialRebalanceDelay) + EasyMock.reset(replicaManager) + + verifyDelayedTaskNotCompleted(firstMemberJoinFuture) + verifyDelayedTaskNotCompleted(secondMemberJoinFuture) + verifyDelayedTaskNotCompleted(thirdMemberJoinFuture) + + // advance clock beyond rebalanceTimeout + timer.advanceClock(1) + + val firstResult = await(firstMemberJoinFuture, 1) + val secondResult = await(secondMemberJoinFuture, 1) + val thirdResult = await(thirdMemberJoinFuture, 1) + assertEquals(Errors.NONE, firstResult.error) + assertEquals(Errors.NONE, secondResult.error) + assertEquals(Errors.NONE, thirdResult.error) + } + + private def setupJoinGroupCallback: (Future[JoinGroupResult], JoinGroupCallback) = { + val responsePromise = Promise[JoinGroupResult] + val responseFuture = responsePromise.future + val responseCallback: JoinGroupCallback = responsePromise.success(_) + (responseFuture, responseCallback) + } + + private def setupSyncGroupCallback: (Future[SyncGroupCallbackParams], SyncGroupCallback) = { + val responsePromise = Promise[SyncGroupCallbackParams] + val responseFuture = responsePromise.future + val responseCallback: SyncGroupCallback = (assignment, error) => + responsePromise.success((assignment, error)) + (responseFuture, responseCallback) + } + + private def setupHeartbeatCallback: (Future[HeartbeatCallbackParams], HeartbeatCallback) = { + val responsePromise = Promise[HeartbeatCallbackParams] + val responseFuture = responsePromise.future + val responseCallback: HeartbeatCallback = error => responsePromise.success(error) + (responseFuture, responseCallback) + } + + private def setupCommitOffsetsCallback: (Future[CommitOffsetCallbackParams], CommitOffsetCallback) = { + val responsePromise = Promise[CommitOffsetCallbackParams] + val responseFuture = responsePromise.future + val responseCallback: CommitOffsetCallback = offsets => responsePromise.success(offsets) + (responseFuture, responseCallback) + } + + private def sendJoinGroup(groupId: String, + memberId: String, + protocolType: String, + protocols: List[(String, Array[Byte])], + rebalanceTimeout: Int = DefaultRebalanceTimeout, + sessionTimeout: Int = DefaultSessionTimeout): Future[JoinGroupResult] = { + val (responseFuture, responseCallback) = setupJoinGroupCallback + + EasyMock.replay(replicaManager) + + groupCoordinator.handleJoinGroup(groupId, memberId, "clientId", "clientHost", rebalanceTimeout, sessionTimeout, + protocolType, protocols, responseCallback) + responseFuture + } + + + private def sendSyncGroupLeader(groupId: String, + generation: Int, + leaderId: String, + assignment: Map[String, Array[Byte]]): Future[SyncGroupCallbackParams] = { + val (responseFuture, responseCallback) = setupSyncGroupCallback + + val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() + + EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), + EasyMock.anyShort(), + internalTopicsAllowed = EasyMock.eq(true), + isFromClient = EasyMock.eq(false), + EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], + EasyMock.capture(capturedArgument), + EasyMock.anyObject().asInstanceOf[Option[Object]])).andAnswer(new IAnswer[Unit] { + override def answer = capturedArgument.getValue.apply( + Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> + new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP) + ) + )}) + EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes() + EasyMock.replay(replicaManager) + + groupCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback) + responseFuture + } + + private def sendSyncGroupFollower(groupId: String, + generation: Int, + memberId: String): Future[SyncGroupCallbackParams] = { + val (responseFuture, responseCallback) = setupSyncGroupCallback + + EasyMock.replay(replicaManager) + + groupCoordinator.handleSyncGroup(groupId, generation, memberId, Map.empty[String, Array[Byte]], responseCallback) + responseFuture + } + + private def joinGroup(groupId: String, + memberId: String, + protocolType: String, + protocols: List[(String, Array[Byte])], + sessionTimeout: Int = DefaultSessionTimeout, + rebalanceTimeout: Int = DefaultRebalanceTimeout): JoinGroupResult = { + val responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, rebalanceTimeout, sessionTimeout) + timer.advanceClock(GroupInitialRebalanceDelay + 1) + // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay + Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS)) + } + + + private def syncGroupFollower(groupId: String, + generationId: Int, + memberId: String, + sessionTimeout: Int = DefaultSessionTimeout): SyncGroupCallbackParams = { + val responseFuture = sendSyncGroupFollower(groupId, generationId, memberId) + Await.result(responseFuture, Duration(sessionTimeout + 100, TimeUnit.MILLISECONDS)) + } + + private def syncGroupLeader(groupId: String, + generationId: Int, + memberId: String, + assignment: Map[String, Array[Byte]], + sessionTimeout: Int = DefaultSessionTimeout): SyncGroupCallbackParams = { + val responseFuture = sendSyncGroupLeader(groupId, generationId, memberId, assignment) + Await.result(responseFuture, Duration(sessionTimeout + 100, TimeUnit.MILLISECONDS)) + } + + private def heartbeat(groupId: String, + consumerId: String, + generationId: Int): HeartbeatCallbackParams = { + val (responseFuture, responseCallback) = setupHeartbeatCallback + + EasyMock.replay(replicaManager) + + groupCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback) + Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) + } + + private def await[T](future: Future[T], millis: Long): T = { + Await.result(future, Duration(millis, TimeUnit.MILLISECONDS)) + } + + private def commitOffsets(groupId: String, + consumerId: String, + generationId: Int, + offsets: immutable.Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = { + val (responseFuture, responseCallback) = setupCommitOffsetsCallback + + val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() + + EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), + EasyMock.anyShort(), + internalTopicsAllowed = EasyMock.eq(true), + isFromClient = EasyMock.eq(false), + EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], + EasyMock.capture(capturedArgument), + EasyMock.anyObject().asInstanceOf[Option[Object]]) + ).andAnswer(new IAnswer[Unit] { + override def answer = capturedArgument.getValue.apply( + Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> + new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP) + ) + )}) + EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes() + EasyMock.replay(replicaManager) + + groupCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback) + Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) + } + + private def commitTransactionalOffsets(groupId: String, + producerId: Long, + producerEpoch: Short, + offsets: immutable.Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = { + val (responseFuture, responseCallback) = setupCommitOffsetsCallback + + val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() + + EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), + EasyMock.anyShort(), + internalTopicsAllowed = EasyMock.eq(true), + isFromClient = EasyMock.eq(false), + EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], + EasyMock.capture(capturedArgument), + EasyMock.anyObject().asInstanceOf[Option[Object]]) + ).andAnswer(new IAnswer[Unit] { + override def answer = capturedArgument.getValue.apply( + Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) -> + new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP) + ) + )}) + EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V2)).anyTimes() + EasyMock.replay(replicaManager) + + groupCoordinator.handleTxnCommitOffsets(groupId, producerId, producerEpoch, offsets, responseCallback) + val result = Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) + EasyMock.reset(replicaManager) + result + } + + private def leaveGroup(groupId: String, consumerId: String): LeaveGroupCallbackParams = { + val (responseFuture, responseCallback) = setupHeartbeatCallback + + EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andReturn(None) + EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes() + EasyMock.replay(replicaManager) + + groupCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback) + Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS)) + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala index 0e13f89..2db6603 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala @@ -455,7 +455,7 @@ class GroupMetadataTest extends JUnitSuite { assertTrue(group.hasPendingOffsetCommitsFromProducer(producerId)) assertTrue(group.hasOffsets) assertEquals(None, group.offset(partition)) - group.failPendingTxnOffsetCommit(producerId, partition, txnOffsetCommit) + group.failPendingTxnOffsetCommit(producerId, partition) assertFalse(group.hasOffsets) assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId)) http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 7e50049..fa2e55b 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -250,19 +250,20 @@ class RequestQuotaTest extends BaseRequestTest { new OffsetsForLeaderEpochRequest.Builder().add(tp, 0) case ApiKeys.ADD_PARTITIONS_TO_TXN => - new AddPartitionsToTxnRequest.Builder("txn1", 1, 0, List(tp).asJava) + new AddPartitionsToTxnRequest.Builder("test-transactional-id", 1, 0, List(tp).asJava) case ApiKeys.ADD_OFFSETS_TO_TXN => - new AddOffsetsToTxnRequest.Builder("txn1", 1, 0, "test-txn-group") + new AddOffsetsToTxnRequest.Builder("test-transactional-id", 1, 0, "test-txn-group") case ApiKeys.END_TXN => - new EndTxnRequest.Builder("txn1", 1, 0, TransactionResult.forId(false)) + new EndTxnRequest.Builder("test-transactional-id", 1, 0, TransactionResult.forId(false)) case ApiKeys.WRITE_TXN_MARKERS => new WriteTxnMarkersRequest.Builder(List.empty.asJava) case ApiKeys.TXN_OFFSET_COMMIT => - new TxnOffsetCommitRequest.Builder("test-txn-group", 2, 0, Map.empty.asJava) + new TxnOffsetCommitRequest.Builder("test-transactional-id", "test-txn-group", 2, 0, + Map.empty[TopicPartition, TxnOffsetCommitRequest.CommittedOffset].asJava) case ApiKeys.DESCRIBE_ACLS => new DescribeAclsRequest.Builder(AclBindingFilter.ANY) http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 70c340b..054a4ff 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1340,21 +1340,19 @@ object TestUtils extends Logging { } // Seeds the given topic with records with keys and values in the range [0..numRecords) - def seedTopicWithNumberedRecords(topic: String, numRecords: Int, servers: Seq[KafkaServer]): Int = { + def seedTopicWithNumberedRecords(topic: String, numRecords: Int, servers: Seq[KafkaServer]): Unit = { val props = new Properties() props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") - var recordsWritten = 0 - val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), retries = Integer.MAX_VALUE, acks = -1, props = Some(props)) + val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), + retries = Integer.MAX_VALUE, acks = -1, props = Some(props)) try { for (i <- 0 until numRecords) { producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, asBytes(i.toString), asBytes(i.toString))) - recordsWritten += 1 } producer.flush() } finally { producer.close() } - recordsWritten } private def asString(bytes: Array[Byte]) = new String(bytes, StandardCharsets.UTF_8) @@ -1404,7 +1402,7 @@ object TestUtils extends Logging { offsetsToCommit.toMap } - def pollUntilAtLeastNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) : Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = { + def pollUntilAtLeastNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int): Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = { val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]() TestUtils.waitUntilTrue(() => { records ++= consumer.poll(50)
