This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 7421f9d KAFKA-6773; Allow offset commit/fetch/describe/delete with empty groupId (#4851) 7421f9d is described below commit 7421f9dce2d2e763f70602b13df1efcd32f691b9 Author: Jason Gustafson <ja...@confluent.io> AuthorDate: Wed Apr 11 16:47:11 2018 -0700 KAFKA-6773; Allow offset commit/fetch/describe/delete with empty groupId (#4851) We had a regression in #4788 which caused the offset commit/fetch/describe APIs to fail if the groupId was empty. This should be allowed for backwards compatibility. Additionally, I have modified DeleteGroups to allow removal of the empty group, which was missed in the initial implementation. I've added a test case to ensure that we do not miss this again in the future. Reviewers: Ismael Juma <ism...@juma.me.uk> --- .../kafka/coordinator/group/GroupCoordinator.scala | 43 +++++---- .../kafka/admin/DeleteConsumerGroupsTest.scala | 26 ------ .../coordinator/group/GroupCoordinatorTest.scala | 100 ++++++++++++++------- 3 files changed, 96 insertions(+), 73 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 225b709..cbbd913 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -27,7 +27,7 @@ import kafka.utils.Logging import kafka.zk.KafkaZkClient import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.internals.Topic -import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.RecordBatch.{NO_PRODUCER_EPOCH, NO_PRODUCER_ID} import org.apache.kafka.common.requests._ import org.apache.kafka.common.utils.Time @@ -109,7 +109,7 @@ class GroupCoordinator(val brokerId: Int, protocolType: String, protocols: List[(String, Array[Byte])], responseCallback: JoinCallback): Unit = { - validateGroup(groupId).foreach { error => + validateGroupStatus(groupId, ApiKeys.JOIN_GROUP).foreach { error => responseCallback(joinError(memberId, error)) return } @@ -237,7 +237,7 @@ class GroupCoordinator(val brokerId: Int, memberId: String, groupAssignment: Map[String, Array[Byte]], responseCallback: SyncCallback): Unit = { - validateGroup(groupId) match { + validateGroupStatus(groupId, ApiKeys.SYNC_GROUP) match { case Some(error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS => // The coordinator is loading, which means we've lost the state of the active rebalance and the // group will need to start over at JoinGroup. By returning rebalance in progress, the consumer @@ -313,7 +313,7 @@ class GroupCoordinator(val brokerId: Int, } def handleLeaveGroup(groupId: String, memberId: String, responseCallback: Errors => Unit): Unit = { - validateGroup(groupId).foreach { error => + validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP).foreach { error => responseCallback(error) return } @@ -346,7 +346,7 @@ class GroupCoordinator(val brokerId: Int, var groupsEligibleForDeletion: Seq[GroupMetadata] = Seq() groupIds.foreach { groupId => - validateGroup(groupId) match { + validateGroupStatus(groupId, ApiKeys.DELETE_GROUPS) match { case Some(error) => groupErrors += groupId -> error @@ -386,7 +386,7 @@ class GroupCoordinator(val brokerId: Int, memberId: String, generationId: Int, responseCallback: Errors => Unit) { - validateGroup(groupId).foreach { error => + validateGroupStatus(groupId, ApiKeys.HEARTBEAT).foreach { error => if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) // the group is still loading, so respond just blindly responseCallback(Errors.NONE) @@ -448,7 +448,7 @@ class GroupCoordinator(val brokerId: Int, producerEpoch: Short, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = { - validateGroup(groupId) match { + validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT) match { case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error)) case None => val group = groupManager.getGroup(groupId).getOrElse { @@ -463,7 +463,7 @@ class GroupCoordinator(val brokerId: Int, generationId: Int, offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata], responseCallback: immutable.Map[TopicPartition, Errors] => Unit) { - validateGroup(groupId) match { + validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match { case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error)) case None => groupManager.getGroup(groupId) match { @@ -524,7 +524,7 @@ class GroupCoordinator(val brokerId: Int, def handleFetchOffsets(groupId: String, partitions: Option[Seq[TopicPartition]] = None): (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = { - validateGroup(groupId) match { + validateGroupStatus(groupId, ApiKeys.OFFSET_FETCH) match { case Some(error) => error -> Map.empty case None => // return offsets blindly regardless the current group state since the group may be using @@ -543,7 +543,7 @@ class GroupCoordinator(val brokerId: Int, } def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = { - validateGroup(groupId) match { + validateGroupStatus(groupId, ApiKeys.DESCRIBE_GROUPS) match { case Some(error) => (error, GroupCoordinator.EmptyGroup) case None => groupManager.getGroup(groupId) match { @@ -563,8 +563,23 @@ class GroupCoordinator(val brokerId: Int, info(s"Removed $offsetsRemoved offsets associated with deleted partitions: ${topicPartitions.mkString(", ")}.") } - private def validateGroup(groupId: String): Option[Errors] = { - if (!validGroupId(groupId)) + private def isValidGroupId(groupId: String, api: ApiKeys): Boolean = { + api match { + case ApiKeys.OFFSET_COMMIT | ApiKeys.OFFSET_FETCH | ApiKeys.DESCRIBE_GROUPS | ApiKeys.DELETE_GROUPS => + // For backwards compatibility, we support the offset commit APIs for the empty groupId, and also + // in DescribeGroups and DeleteGroups so that users can view and delete state of all groups. + groupId != null + case _ => + // The remaining APIs are groups using Kafka for group coordination and must have a non-empty groupId + groupId != null && !groupId.isEmpty + } + } + + /** + * Check that the groupId is valid, assigned to this coordinator and that the group has been loaded. + */ + private def validateGroupStatus(groupId: String, api: ApiKeys): Option[Errors] = { + if (!isValidGroupId(groupId, api)) Some(Errors.INVALID_GROUP_ID) else if (!isActive.get) Some(Errors.COORDINATOR_NOT_AVAILABLE) @@ -648,10 +663,6 @@ class GroupCoordinator(val brokerId: Int, } } - private def validGroupId(groupId: String): Boolean = { - groupId != null && !groupId.isEmpty - } - private def joinError(memberId: String, error: Errors): JoinGroupResult = { JoinGroupResult( members = Map.empty, diff --git a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala index effa55d..4cc2837 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala @@ -60,32 +60,6 @@ class DeleteConsumerGroupTest extends ConsumerGroupCommandTest { } @Test - def testDeleteCmdInvalidGroupId() { - TestUtils.createOffsetsTopic(zkClient, servers) - val invalidGroupId = "" - - val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", invalidGroupId) - val service = getConsumerGroupService(cgcArgs) - - val output = TestUtils.grabConsoleOutput(service.deleteGroups()) - assertTrue(s"The expected error (${Errors.INVALID_GROUP_ID}) was not detected while deleting consumer group", - output.contains(s"Group '$invalidGroupId' could not be deleted due to: ${Errors.INVALID_GROUP_ID.toString}")) - } - - @Test - def testDeleteInvalidGroupId() { - TestUtils.createOffsetsTopic(zkClient, servers) - val invalidGroupId = "" - - val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", "--group", invalidGroupId) - val service = getConsumerGroupService(cgcArgs) - - val result = service.deleteGroups() - assertTrue(s"The expected error (${Errors.INVALID_GROUP_ID}) was not detected while deleting consumer group", - result.size == 1 && result.keySet.contains(invalidGroupId) && result.get(invalidGroupId).contains(Errors.INVALID_GROUP_ID)) - } - - @Test def testDeleteCmdNonEmptyGroup() { TestUtils.createOffsetsTopic(zkClient, servers) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 08c13eb..933e91b 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -37,7 +37,7 @@ import org.junit.Assert._ import org.junit.{After, Assert, Before, Test} import org.scalatest.junit.JUnitSuite -import scala.collection._ +import scala.collection.mutable import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future, Promise, TimeoutException} @@ -138,7 +138,7 @@ class GroupCoordinatorTest extends JUnitSuite { val topicPartition = new TopicPartition("foo", 0) var offsetCommitErrors = Map.empty[TopicPartition, Errors] groupCoordinator.handleCommitOffsets(otherGroupId, memberId, 1, - immutable.Map(topicPartition -> OffsetAndMetadata(15L)), result => { offsetCommitErrors = result }) + Map(topicPartition -> OffsetAndMetadata(15L)), result => { offsetCommitErrors = result }) assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), offsetCommitErrors.get(topicPartition)) // Heartbeat @@ -155,7 +155,7 @@ class GroupCoordinatorTest extends JUnitSuite { assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsError) // DeleteGroups - val deleteGroupsErrors = groupCoordinator.handleDeleteGroups(immutable.Set(otherGroupId)) + val deleteGroupsErrors = groupCoordinator.handleDeleteGroups(Set(otherGroupId)) assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), deleteGroupsErrors.get(otherGroupId)) // Check that non-loading groups are still accessible @@ -452,7 +452,7 @@ class GroupCoordinatorTest extends JUnitSuite { timer.advanceClock(sessionTimeout / 2) EasyMock.reset(replicaManager) - val commitOffsetResult = commitOffsets(groupId, assignedConsumerId, generationId, immutable.Map(tp -> offset)) + val commitOffsetResult = commitOffsets(groupId, assignedConsumerId, generationId, Map(tp -> offset)) assertEquals(Errors.NONE, commitOffsetResult(tp)) timer.advanceClock(sessionTimeout / 2 + 100) @@ -820,7 +820,7 @@ class GroupCoordinatorTest extends JUnitSuite { val tp = new TopicPartition("topic", 0) val offset = OffsetAndMetadata(0) - val commitOffsetResult = commitOffsets(groupId, memberId, generationId, immutable.Map(tp -> offset)) + val commitOffsetResult = commitOffsets(groupId, memberId, generationId, Map(tp -> offset)) assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp)) } @@ -830,7 +830,7 @@ class GroupCoordinatorTest extends JUnitSuite { val offset = OffsetAndMetadata(0) val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID, - OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset)) + OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset)) assertEquals(Errors.NONE, commitOffsetResult(tp)) } @@ -856,7 +856,7 @@ class GroupCoordinatorTest extends JUnitSuite { val tp = new TopicPartition("topic", 0) val offset = OffsetAndMetadata(0) val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID, - OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset)) + OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset)) assertEquals(Errors.NONE, commitOffsetResult(tp)) val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) @@ -870,7 +870,7 @@ class GroupCoordinatorTest extends JUnitSuite { val offset = OffsetAndMetadata(0) val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID, - OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset)) + OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset)) assertEquals(Errors.NONE, commitOffsetResult(tp)) val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) @@ -879,13 +879,51 @@ class GroupCoordinatorTest extends JUnitSuite { } @Test + def testCommitAndFetchOffsetsWithEmptyGroup() { + // For backwards compatibility, the coordinator supports committing/fetching offsets with an empty groupId. + // To allow inspection and removal of the empty group, we must also support DescribeGroups and DeleteGroups + + val tp = new TopicPartition("topic", 0) + val offset = OffsetAndMetadata(0) + val groupId = "" + + val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID, + OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset)) + assertEquals(Errors.NONE, commitOffsetResult(tp)) + + val (fetchError, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) + assertEquals(Errors.NONE, fetchError) + assertEquals(Some(0), partitionData.get(tp).map(_.offset)) + + val (describeError, summary) = groupCoordinator.handleDescribeGroup(groupId) + assertEquals(Errors.NONE, describeError) + assertEquals(Empty.toString, summary.state) + + val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) + val partition = EasyMock.niceMock(classOf[Partition]) + + EasyMock.reset(replicaManager) + EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) + EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition)) + EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition)) + EasyMock.replay(replicaManager, partition) + + val deleteErrors = groupCoordinator.handleDeleteGroups(Set(groupId)) + assertEquals(Errors.NONE, deleteErrors(groupId)) + + val (err, data) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) + assertEquals(Errors.NONE, err) + assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), data.get(tp).map(_.offset)) + } + + @Test def testBasicFetchTxnOffsets() { val tp = new TopicPartition("topic", 0) val offset = OffsetAndMetadata(0) val producerId = 1000L val producerEpoch : Short = 2 - val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(tp -> offset)) + val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, Map(tp -> offset)) assertEquals(Errors.NONE, commitOffsetResult(tp)) val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) @@ -912,7 +950,7 @@ class GroupCoordinatorTest extends JUnitSuite { val producerId = 1000L val producerEpoch : Short = 2 - val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(tp -> offset)) + val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, Map(tp -> offset)) assertEquals(Errors.NONE, commitOffsetResult(tp)) val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) @@ -936,7 +974,7 @@ class GroupCoordinatorTest extends JUnitSuite { val producerId = 1000L val producerEpoch : Short = 2 - val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(tp -> offset)) + val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, Map(tp -> offset)) assertEquals(Errors.NONE, commitOffsetResult(tp)) val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) @@ -975,16 +1013,16 @@ class GroupCoordinatorTest extends JUnitSuite { groupCoordinator.groupManager.addPartitionOwnership(offsetTopicPartitions(1).partition) val errors = mutable.ArrayBuffer[Errors]() - val partitionData = mutable.ArrayBuffer[Map[TopicPartition, OffsetFetchResponse.PartitionData]]() + val partitionData = mutable.ArrayBuffer[scala.collection.Map[TopicPartition, OffsetFetchResponse.PartitionData]]() val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]() // Ensure that the two groups map to different partitions. assertNotEquals(offsetTopicPartitions(0), offsetTopicPartitions(1)) - commitOffsetResults.append(commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(partitions(0) -> offsets(0)))) + commitOffsetResults.append(commitTransactionalOffsets(groupId, producerId, producerEpoch, Map(partitions(0) -> offsets(0)))) assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0))) - commitOffsetResults.append(commitTransactionalOffsets(otherGroupId, producerId, producerEpoch, immutable.Map(partitions(1) -> offsets(1)))) + commitOffsetResults.append(commitTransactionalOffsets(otherGroupId, producerId, producerEpoch, Map(partitions(1) -> offsets(1)))) assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1))) // We got a commit for only one __consumer_offsets partition. We should only materialize it's group offsets. @@ -1051,16 +1089,16 @@ class GroupCoordinatorTest extends JUnitSuite { val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) val errors = mutable.ArrayBuffer[Errors]() - val partitionData = mutable.ArrayBuffer[Map[TopicPartition, OffsetFetchResponse.PartitionData]]() + val partitionData = mutable.ArrayBuffer[scala.collection.Map[TopicPartition, OffsetFetchResponse.PartitionData]]() val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]() // producer0 commits the offsets for partition0 - commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(0), producerEpochs(0), immutable.Map(partitions(0) -> offsets(0)))) + commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(0), producerEpochs(0), Map(partitions(0) -> offsets(0)))) assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0))) // producer1 commits the offsets for partition1 - commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(1), producerEpochs(1), immutable.Map(partitions(1) -> offsets(1)))) + commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(1), producerEpochs(1), Map(partitions(1) -> offsets(1)))) assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1))) // producer0 commits its transaction. @@ -1123,7 +1161,7 @@ class GroupCoordinatorTest extends JUnitSuite { assertEquals((Errors.NONE, Map.empty), groupCoordinator.handleFetchOffsets(groupId)) val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID, - OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp1 -> offset1, tp2 -> offset2, tp3 -> offset3)) + OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp1 -> offset1, tp2 -> offset2, tp3 -> offset3)) assertEquals(Errors.NONE, commitOffsetResult(tp1)) assertEquals(Errors.NONE, commitOffsetResult(tp2)) assertEquals(Errors.NONE, commitOffsetResult(tp3)) @@ -1150,7 +1188,7 @@ class GroupCoordinatorTest extends JUnitSuite { assertEquals(Errors.NONE, joinGroupError) EasyMock.reset(replicaManager) - val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, immutable.Map(tp -> offset)) + val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, Map(tp -> offset)) assertEquals(Errors.REBALANCE_IN_PROGRESS, commitOffsetResult(tp)) } @@ -1332,20 +1370,20 @@ class GroupCoordinatorTest extends JUnitSuite { val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID joinGroup(groupId, memberId, protocolType, protocols) - val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet) + val result = groupCoordinator.handleDeleteGroups(Set(groupId)) assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NON_EMPTY_GROUP)) } @Test def testDeleteGroupWithInvalidGroupId() { - val invalidGroupId = "" - val result = groupCoordinator.handleDeleteGroups(Set(invalidGroupId).toSet) + val invalidGroupId = null + val result = groupCoordinator.handleDeleteGroups(Set(invalidGroupId)) assert(result.size == 1 && result.contains(invalidGroupId) && result.get(invalidGroupId).contains(Errors.INVALID_GROUP_ID)) } @Test def testDeleteGroupWithWrongCoordinator() { - val result = groupCoordinator.handleDeleteGroups(Set(otherGroupId).toSet) + val result = groupCoordinator.handleDeleteGroups(Set(otherGroupId)) assert(result.size == 1 && result.contains(otherGroupId) && result.get(otherGroupId).contains(Errors.NOT_COORDINATOR)) } @@ -1367,7 +1405,7 @@ class GroupCoordinatorTest extends JUnitSuite { EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition)) EasyMock.replay(replicaManager, partition) - val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet) + val result = groupCoordinator.handleDeleteGroups(Set(groupId)) assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE)) } @@ -1388,7 +1426,7 @@ class GroupCoordinatorTest extends JUnitSuite { EasyMock.reset(replicaManager) val tp = new TopicPartition("topic", 0) val offset = OffsetAndMetadata(0) - val commitOffsetResult = commitOffsets(groupId, assignedMemberId, joinGroupResult.generationId, immutable.Map(tp -> offset)) + val commitOffsetResult = commitOffsets(groupId, assignedMemberId, joinGroupResult.generationId, Map(tp -> offset)) assertEquals(Errors.NONE, commitOffsetResult(tp)) val describeGroupResult = groupCoordinator.handleDescribeGroup(groupId) @@ -1408,7 +1446,7 @@ class GroupCoordinatorTest extends JUnitSuite { EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition)) EasyMock.replay(replicaManager, partition) - val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet) + val result = groupCoordinator.handleDeleteGroups(Set(groupId)) assert(result.size == 1 && result.contains(groupId) && result.get(groupId).contains(Errors.NONE)) assertEquals(Dead.toString, groupCoordinator.handleDescribeGroup(groupId)._2.state) @@ -1535,7 +1573,7 @@ class GroupCoordinatorTest extends JUnitSuite { assignment: Map[String, Array[Byte]]): Future[SyncGroupCallbackParams] = { val (responseFuture, responseCallback) = setupSyncGroupCallback - val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() + val capturedArgument: Capture[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), EasyMock.anyShort(), @@ -1616,10 +1654,10 @@ class GroupCoordinatorTest extends JUnitSuite { private def commitOffsets(groupId: String, consumerId: String, generationId: Int, - offsets: immutable.Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = { + offsets: Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = { val (responseFuture, responseCallback) = setupCommitOffsetsCallback - val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() + val capturedArgument: Capture[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), EasyMock.anyShort(), @@ -1646,10 +1684,10 @@ class GroupCoordinatorTest extends JUnitSuite { private def commitTransactionalOffsets(groupId: String, producerId: Long, producerEpoch: Short, - offsets: immutable.Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = { + offsets: Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = { val (responseFuture, responseCallback) = setupCommitOffsetsCallback - val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() + val capturedArgument: Capture[scala.collection.Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), EasyMock.anyShort(), -- To stop receiving notification emails like this one, please contact j...@apache.org.