This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7421f9d  KAFKA-6773; Allow offset commit/fetch/describe/delete with 
empty groupId (#4851)
7421f9d is described below

commit 7421f9dce2d2e763f70602b13df1efcd32f691b9
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Apr 11 16:47:11 2018 -0700

    KAFKA-6773; Allow offset commit/fetch/describe/delete with empty groupId 
(#4851)
    
    We had a regression in #4788 which caused the offset commit/fetch/describe 
APIs to fail if the groupId was empty. This should be allowed for backwards 
compatibility. Additionally, I have modified DeleteGroups to allow removal of 
the empty group, which was missed in the initial implementation. I've added a 
test case to ensure that we do not miss this again in the future.
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>
---
 .../kafka/coordinator/group/GroupCoordinator.scala |  43 +++++----
 .../kafka/admin/DeleteConsumerGroupsTest.scala     |  26 ------
 .../coordinator/group/GroupCoordinatorTest.scala   | 100 ++++++++++++++-------
 3 files changed, 96 insertions(+), 73 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 225b709..cbbd913 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -27,7 +27,7 @@ import kafka.utils.Logging
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch.{NO_PRODUCER_EPOCH, 
NO_PRODUCER_ID}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.utils.Time
@@ -109,7 +109,7 @@ class GroupCoordinator(val brokerId: Int,
                       protocolType: String,
                       protocols: List[(String, Array[Byte])],
                       responseCallback: JoinCallback): Unit = {
-    validateGroup(groupId).foreach { error =>
+    validateGroupStatus(groupId, ApiKeys.JOIN_GROUP).foreach { error =>
       responseCallback(joinError(memberId, error))
       return
     }
@@ -237,7 +237,7 @@ class GroupCoordinator(val brokerId: Int,
                       memberId: String,
                       groupAssignment: Map[String, Array[Byte]],
                       responseCallback: SyncCallback): Unit = {
-    validateGroup(groupId) match {
+    validateGroupStatus(groupId, ApiKeys.SYNC_GROUP) match {
       case Some(error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS =>
         // The coordinator is loading, which means we've lost the state of the 
active rebalance and the
         // group will need to start over at JoinGroup. By returning rebalance 
in progress, the consumer
@@ -313,7 +313,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def handleLeaveGroup(groupId: String, memberId: String, responseCallback: 
Errors => Unit): Unit = {
-    validateGroup(groupId).foreach { error =>
+    validateGroupStatus(groupId, ApiKeys.LEAVE_GROUP).foreach { error =>
       responseCallback(error)
       return
     }
@@ -346,7 +346,7 @@ class GroupCoordinator(val brokerId: Int,
     var groupsEligibleForDeletion: Seq[GroupMetadata] = Seq()
 
     groupIds.foreach { groupId =>
-      validateGroup(groupId) match {
+      validateGroupStatus(groupId, ApiKeys.DELETE_GROUPS) match {
         case Some(error) =>
           groupErrors += groupId -> error
 
@@ -386,7 +386,7 @@ class GroupCoordinator(val brokerId: Int,
                       memberId: String,
                       generationId: Int,
                       responseCallback: Errors => Unit) {
-    validateGroup(groupId).foreach { error =>
+    validateGroupStatus(groupId, ApiKeys.HEARTBEAT).foreach { error =>
       if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS)
         // the group is still loading, so respond just blindly
         responseCallback(Errors.NONE)
@@ -448,7 +448,7 @@ class GroupCoordinator(val brokerId: Int,
                              producerEpoch: Short,
                              offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
                              responseCallback: immutable.Map[TopicPartition, 
Errors] => Unit): Unit = {
-    validateGroup(groupId) match {
+    validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT) match {
       case Some(error) => responseCallback(offsetMetadata.mapValues(_ => 
error))
       case None =>
         val group = groupManager.getGroup(groupId).getOrElse {
@@ -463,7 +463,7 @@ class GroupCoordinator(val brokerId: Int,
                           generationId: Int,
                           offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
                           responseCallback: immutable.Map[TopicPartition, 
Errors] => Unit) {
-    validateGroup(groupId) match {
+    validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match {
       case Some(error) => responseCallback(offsetMetadata.mapValues(_ => 
error))
       case None =>
         groupManager.getGroup(groupId) match {
@@ -524,7 +524,7 @@ class GroupCoordinator(val brokerId: Int,
   def handleFetchOffsets(groupId: String, partitions: 
Option[Seq[TopicPartition]] = None):
   (Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {
 
-    validateGroup(groupId) match {
+    validateGroupStatus(groupId, ApiKeys.OFFSET_FETCH) match {
       case Some(error) => error -> Map.empty
       case None =>
         // return offsets blindly regardless the current group state since the 
group may be using
@@ -543,7 +543,7 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def handleDescribeGroup(groupId: String): (Errors, GroupSummary) = {
-    validateGroup(groupId) match {
+    validateGroupStatus(groupId, ApiKeys.DESCRIBE_GROUPS) match {
       case Some(error) => (error, GroupCoordinator.EmptyGroup)
       case None =>
         groupManager.getGroup(groupId) match {
@@ -563,8 +563,23 @@ class GroupCoordinator(val brokerId: Int,
     info(s"Removed $offsetsRemoved offsets associated with deleted partitions: 
${topicPartitions.mkString(", ")}.")
   }
 
-  private def validateGroup(groupId: String): Option[Errors] = {
-    if (!validGroupId(groupId))
+  private def isValidGroupId(groupId: String, api: ApiKeys): Boolean = {
+    api match {
+      case ApiKeys.OFFSET_COMMIT | ApiKeys.OFFSET_FETCH | 
ApiKeys.DESCRIBE_GROUPS | ApiKeys.DELETE_GROUPS =>
+        // For backwards compatibility, we support the offset commit APIs for 
the empty groupId, and also
+        // in DescribeGroups and DeleteGroups so that users can view and 
delete state of all groups.
+        groupId != null
+      case _ =>
+        // The remaining APIs are groups using Kafka for group coordination 
and must have a non-empty groupId
+        groupId != null && !groupId.isEmpty
+    }
+  }
+
+  /**
+   * Check that the groupId is valid, assigned to this coordinator and that 
the group has been loaded.
+   */
+  private def validateGroupStatus(groupId: String, api: ApiKeys): 
Option[Errors] = {
+    if (!isValidGroupId(groupId, api))
       Some(Errors.INVALID_GROUP_ID)
     else if (!isActive.get)
       Some(Errors.COORDINATOR_NOT_AVAILABLE)
@@ -648,10 +663,6 @@ class GroupCoordinator(val brokerId: Int,
     }
   }
 
-  private def validGroupId(groupId: String): Boolean = {
-    groupId != null && !groupId.isEmpty
-  }
-
   private def joinError(memberId: String, error: Errors): JoinGroupResult = {
     JoinGroupResult(
       members = Map.empty,
diff --git 
a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala 
b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
index effa55d..4cc2837 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteConsumerGroupsTest.scala
@@ -60,32 +60,6 @@ class DeleteConsumerGroupTest extends 
ConsumerGroupCommandTest {
   }
 
   @Test
-  def testDeleteCmdInvalidGroupId() {
-    TestUtils.createOffsetsTopic(zkClient, servers)
-    val invalidGroupId = ""
-
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", 
"--group", invalidGroupId)
-    val service = getConsumerGroupService(cgcArgs)
-
-    val output = TestUtils.grabConsoleOutput(service.deleteGroups())
-    assertTrue(s"The expected error (${Errors.INVALID_GROUP_ID}) was not 
detected while deleting consumer group",
-      output.contains(s"Group '$invalidGroupId' could not be deleted due to: 
${Errors.INVALID_GROUP_ID.toString}"))
-  }
-
-  @Test
-  def testDeleteInvalidGroupId() {
-    TestUtils.createOffsetsTopic(zkClient, servers)
-    val invalidGroupId = ""
-
-    val cgcArgs = Array("--bootstrap-server", brokerList, "--delete", 
"--group", invalidGroupId)
-    val service = getConsumerGroupService(cgcArgs)
-
-    val result = service.deleteGroups()
-    assertTrue(s"The expected error (${Errors.INVALID_GROUP_ID}) was not 
detected while deleting consumer group",
-      result.size == 1 && result.keySet.contains(invalidGroupId) && 
result.get(invalidGroupId).contains(Errors.INVALID_GROUP_ID))
-  }
-
-  @Test
   def testDeleteCmdNonEmptyGroup() {
     TestUtils.createOffsetsTopic(zkClient, servers)
 
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 08c13eb..933e91b 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -37,7 +37,7 @@ import org.junit.Assert._
 import org.junit.{After, Assert, Before, Test}
 import org.scalatest.junit.JUnitSuite
 
-import scala.collection._
+import scala.collection.mutable
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future, Promise, TimeoutException}
 
@@ -138,7 +138,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val topicPartition = new TopicPartition("foo", 0)
     var offsetCommitErrors = Map.empty[TopicPartition, Errors]
     groupCoordinator.handleCommitOffsets(otherGroupId, memberId, 1,
-      immutable.Map(topicPartition -> OffsetAndMetadata(15L)), result => { 
offsetCommitErrors = result })
+      Map(topicPartition -> OffsetAndMetadata(15L)), result => { 
offsetCommitErrors = result })
     assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), 
offsetCommitErrors.get(topicPartition))
 
     // Heartbeat
@@ -155,7 +155,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, listGroupsError)
 
     // DeleteGroups
-    val deleteGroupsErrors = 
groupCoordinator.handleDeleteGroups(immutable.Set(otherGroupId))
+    val deleteGroupsErrors = 
groupCoordinator.handleDeleteGroups(Set(otherGroupId))
     assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), 
deleteGroupsErrors.get(otherGroupId))
 
     // Check that non-loading groups are still accessible
@@ -452,7 +452,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     timer.advanceClock(sessionTimeout / 2)
 
     EasyMock.reset(replicaManager)
-    val commitOffsetResult = commitOffsets(groupId, assignedConsumerId, 
generationId, immutable.Map(tp -> offset))
+    val commitOffsetResult = commitOffsets(groupId, assignedConsumerId, 
generationId, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     timer.advanceClock(sessionTimeout / 2 + 100)
@@ -820,7 +820,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val tp = new TopicPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
 
-    val commitOffsetResult = commitOffsets(groupId, memberId, generationId, 
immutable.Map(tp -> offset))
+    val commitOffsetResult = commitOffsets(groupId, memberId, generationId, 
Map(tp -> offset))
     assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp))
   }
 
@@ -830,7 +830,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val offset = OffsetAndMetadata(0)
 
     val commitOffsetResult = commitOffsets(groupId, 
OffsetCommitRequest.DEFAULT_MEMBER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
   }
 
@@ -856,7 +856,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val tp = new TopicPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
     val commitOffsetResult = commitOffsets(groupId, 
OffsetCommitRequest.DEFAULT_MEMBER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
Some(Seq(tp)))
@@ -870,7 +870,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val offset = OffsetAndMetadata(0)
 
     val commitOffsetResult = commitOffsets(groupId, 
OffsetCommitRequest.DEFAULT_MEMBER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
Some(Seq(tp)))
@@ -879,13 +879,51 @@ class GroupCoordinatorTest extends JUnitSuite {
   }
 
   @Test
+  def testCommitAndFetchOffsetsWithEmptyGroup() {
+    // For backwards compatibility, the coordinator supports 
committing/fetching offsets with an empty groupId.
+    // To allow inspection and removal of the empty group, we must also 
support DescribeGroups and DeleteGroups
+
+    val tp = new TopicPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+    val groupId = ""
+
+    val commitOffsetResult = commitOffsets(groupId, 
OffsetCommitRequest.DEFAULT_MEMBER_ID,
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
+
+    val (fetchError, partitionData) = 
groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+    assertEquals(Errors.NONE, fetchError)
+    assertEquals(Some(0), partitionData.get(tp).map(_.offset))
+
+    val (describeError, summary) = 
groupCoordinator.handleDescribeGroup(groupId)
+    assertEquals(Errors.NONE, describeError)
+    assertEquals(Empty.toString, summary.state)
+
+    val groupTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
+    val partition = EasyMock.niceMock(classOf[Partition])
+
+    EasyMock.reset(replicaManager)
+    
EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
+    
EasyMock.expect(replicaManager.getPartition(groupTopicPartition)).andStubReturn(Some(partition))
+    
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
+    EasyMock.replay(replicaManager, partition)
+
+    val deleteErrors = groupCoordinator.handleDeleteGroups(Set(groupId))
+    assertEquals(Errors.NONE, deleteErrors(groupId))
+
+    val (err, data) = groupCoordinator.handleFetchOffsets(groupId, 
Some(Seq(tp)))
+    assertEquals(Errors.NONE, err)
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
data.get(tp).map(_.offset))
+  }
+
+  @Test
   def testBasicFetchTxnOffsets() {
     val tp = new TopicPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
     val producerId = 1000L
     val producerEpoch : Short = 2
 
-    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, immutable.Map(tp -> offset))
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
Some(Seq(tp)))
@@ -912,7 +950,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val producerId = 1000L
     val producerEpoch : Short = 2
 
-    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, immutable.Map(tp -> offset))
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
Some(Seq(tp)))
@@ -936,7 +974,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val producerId = 1000L
     val producerEpoch : Short = 2
 
-    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, immutable.Map(tp -> offset))
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
Some(Seq(tp)))
@@ -975,16 +1013,16 @@ class GroupCoordinatorTest extends JUnitSuite {
 
     
groupCoordinator.groupManager.addPartitionOwnership(offsetTopicPartitions(1).partition)
     val errors = mutable.ArrayBuffer[Errors]()
-    val partitionData = mutable.ArrayBuffer[Map[TopicPartition, 
OffsetFetchResponse.PartitionData]]()
+    val partitionData = 
mutable.ArrayBuffer[scala.collection.Map[TopicPartition, 
OffsetFetchResponse.PartitionData]]()
 
     val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]()
 
     // Ensure that the two groups map to different partitions.
     assertNotEquals(offsetTopicPartitions(0), offsetTopicPartitions(1))
 
-    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerId, 
producerEpoch, immutable.Map(partitions(0) -> offsets(0))))
+    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerId, 
producerEpoch, Map(partitions(0) -> offsets(0))))
     assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0)))
-    commitOffsetResults.append(commitTransactionalOffsets(otherGroupId, 
producerId, producerEpoch, immutable.Map(partitions(1) -> offsets(1))))
+    commitOffsetResults.append(commitTransactionalOffsets(otherGroupId, 
producerId, producerEpoch, Map(partitions(1) -> offsets(1))))
     assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
 
     // We got a commit for only one __consumer_offsets partition. We should 
only materialize it's group offsets.
@@ -1051,16 +1089,16 @@ class GroupCoordinatorTest extends JUnitSuite {
     val offsetTopicPartition = new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupCoordinator.partitionFor(groupId))
 
     val errors = mutable.ArrayBuffer[Errors]()
-    val partitionData = mutable.ArrayBuffer[Map[TopicPartition, 
OffsetFetchResponse.PartitionData]]()
+    val partitionData = 
mutable.ArrayBuffer[scala.collection.Map[TopicPartition, 
OffsetFetchResponse.PartitionData]]()
 
     val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]()
 
     // producer0 commits the offsets for partition0
-    commitOffsetResults.append(commitTransactionalOffsets(groupId, 
producerIds(0), producerEpochs(0), immutable.Map(partitions(0) -> offsets(0))))
+    commitOffsetResults.append(commitTransactionalOffsets(groupId, 
producerIds(0), producerEpochs(0), Map(partitions(0) -> offsets(0))))
     assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0)))
 
     // producer1 commits the offsets for partition1
-    commitOffsetResults.append(commitTransactionalOffsets(groupId, 
producerIds(1), producerEpochs(1), immutable.Map(partitions(1) -> offsets(1))))
+    commitOffsetResults.append(commitTransactionalOffsets(groupId, 
producerIds(1), producerEpochs(1), Map(partitions(1) -> offsets(1))))
     assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
 
     // producer0 commits its transaction.
@@ -1123,7 +1161,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals((Errors.NONE, Map.empty), 
groupCoordinator.handleFetchOffsets(groupId))
 
     val commitOffsetResult = commitOffsets(groupId, 
OffsetCommitRequest.DEFAULT_MEMBER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp1 -> offset1, 
tp2 -> offset2, tp3 -> offset3))
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp1 -> offset1, tp2 -> 
offset2, tp3 -> offset3))
     assertEquals(Errors.NONE, commitOffsetResult(tp1))
     assertEquals(Errors.NONE, commitOffsetResult(tp2))
     assertEquals(Errors.NONE, commitOffsetResult(tp3))
@@ -1150,7 +1188,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Errors.NONE, joinGroupError)
 
     EasyMock.reset(replicaManager)
-    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, 
generationId, immutable.Map(tp -> offset))
+    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, 
generationId, Map(tp -> offset))
     assertEquals(Errors.REBALANCE_IN_PROGRESS, commitOffsetResult(tp))
   }
 
@@ -1332,20 +1370,20 @@ class GroupCoordinatorTest extends JUnitSuite {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     joinGroup(groupId, memberId, protocolType, protocols)
 
-    val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
+    val result = groupCoordinator.handleDeleteGroups(Set(groupId))
     assert(result.size == 1 && result.contains(groupId) && 
result.get(groupId).contains(Errors.NON_EMPTY_GROUP))
   }
 
   @Test
   def testDeleteGroupWithInvalidGroupId() {
-    val invalidGroupId = ""
-    val result = groupCoordinator.handleDeleteGroups(Set(invalidGroupId).toSet)
+    val invalidGroupId = null
+    val result = groupCoordinator.handleDeleteGroups(Set(invalidGroupId))
     assert(result.size == 1 && result.contains(invalidGroupId) && 
result.get(invalidGroupId).contains(Errors.INVALID_GROUP_ID))
   }
 
   @Test
   def testDeleteGroupWithWrongCoordinator() {
-    val result = groupCoordinator.handleDeleteGroups(Set(otherGroupId).toSet)
+    val result = groupCoordinator.handleDeleteGroups(Set(otherGroupId))
     assert(result.size == 1 && result.contains(otherGroupId) && 
result.get(otherGroupId).contains(Errors.NOT_COORDINATOR))
   }
 
@@ -1367,7 +1405,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
     EasyMock.replay(replicaManager, partition)
 
-    val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
+    val result = groupCoordinator.handleDeleteGroups(Set(groupId))
     assert(result.size == 1 && result.contains(groupId) && 
result.get(groupId).contains(Errors.NONE))
   }
 
@@ -1388,7 +1426,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     EasyMock.reset(replicaManager)
     val tp = new TopicPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
-    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, 
joinGroupResult.generationId, immutable.Map(tp -> offset))
+    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, 
joinGroupResult.generationId, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val describeGroupResult = groupCoordinator.handleDescribeGroup(groupId)
@@ -1408,7 +1446,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     
EasyMock.expect(replicaManager.nonOfflinePartition(groupTopicPartition)).andStubReturn(Some(partition))
     EasyMock.replay(replicaManager, partition)
 
-    val result = groupCoordinator.handleDeleteGroups(Set(groupId).toSet)
+    val result = groupCoordinator.handleDeleteGroups(Set(groupId))
     assert(result.size == 1 && result.contains(groupId) && 
result.get(groupId).contains(Errors.NONE))
 
     assertEquals(Dead.toString, 
groupCoordinator.handleDescribeGroup(groupId)._2.state)
@@ -1535,7 +1573,7 @@ class GroupCoordinatorTest extends JUnitSuite {
                                   assignment: Map[String, Array[Byte]]): 
Future[SyncGroupCallbackParams] = {
     val (responseFuture, responseCallback) = setupSyncGroupCallback
 
-    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => 
Unit] = EasyMock.newCapture()
+    val capturedArgument: Capture[scala.collection.Map[TopicPartition, 
PartitionResponse] => Unit] = EasyMock.newCapture()
 
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
@@ -1616,10 +1654,10 @@ class GroupCoordinatorTest extends JUnitSuite {
   private def commitOffsets(groupId: String,
                             consumerId: String,
                             generationId: Int,
-                            offsets: immutable.Map[TopicPartition, 
OffsetAndMetadata]): CommitOffsetCallbackParams = {
+                            offsets: Map[TopicPartition, OffsetAndMetadata]): 
CommitOffsetCallbackParams = {
     val (responseFuture, responseCallback) = setupCommitOffsetsCallback
 
-    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => 
Unit] = EasyMock.newCapture()
+    val capturedArgument: Capture[scala.collection.Map[TopicPartition, 
PartitionResponse] => Unit] = EasyMock.newCapture()
 
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),
@@ -1646,10 +1684,10 @@ class GroupCoordinatorTest extends JUnitSuite {
   private def commitTransactionalOffsets(groupId: String,
                                          producerId: Long,
                                          producerEpoch: Short,
-                                         offsets: 
immutable.Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = 
{
+                                         offsets: Map[TopicPartition, 
OffsetAndMetadata]): CommitOffsetCallbackParams = {
     val (responseFuture, responseCallback) = setupCommitOffsetsCallback
 
-    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => 
Unit] = EasyMock.newCapture()
+    val capturedArgument: Capture[scala.collection.Map[TopicPartition, 
PartitionResponse] => Unit] = EasyMock.newCapture()
 
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
       EasyMock.anyShort(),

-- 
To stop receiving notification emails like this one, please contact
j...@apache.org.

Reply via email to