dajac commented on code in PR #14537:
URL: https://github.com/apache/kafka/pull/14537#discussion_r1371838869


##########
core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala:
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import 
org.apache.kafka.common.message.DescribeGroupsResponseData.{DescribedGroup, 
DescribedGroupMember}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+import scala.jdk.CollectionConverters._
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class DescribeGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),

Review Comment:
   I suppose that we don't need this in this case.



##########
core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala:
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import 
org.apache.kafka.common.message.DescribeGroupsResponseData.{DescribedGroup, 
DescribedGroupMember}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+import scala.jdk.CollectionConverters._
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class DescribeGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDescribeGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testDescribeGroups()
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDescribeGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+    testDescribeGroups()
+  }
+
+  private def testDescribeGroups(): Unit = {
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    // Create the topic.
+    createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    // Join the consumer group. Note that we don't heartbeat here so we must 
use
+    // a session long enough for the duration of the test.
+    val (memberId1, _) = joinConsumerGroupWithOldProtocol(
+      groupId = "grp-1",
+      metadata = Array(1, 2, 3)
+    )
+    val (memberId2, _) = joinConsumerGroupWithOldProtocol(
+      groupId = "grp-2",
+      metadata = Array(1, 2, 3),
+      completeRebalance = false
+    )
+
+    for (version <- ApiKeys.DESCRIBE_GROUPS.oldestVersion() to 
ApiKeys.DESCRIBE_GROUPS.latestVersion(isUnstableApiEnabled)) {
+      assertEquals(
+        List(
+          new DescribedGroup()
+            .setGroupId("grp-1")
+            .setGroupState(GenericGroupState.STABLE.toString)
+            .setProtocolType("consumer")
+            .setProtocolData("consumer-range")
+            .setMembers(List(
+              new DescribedGroupMember()
+                .setMemberId(memberId1)
+                .setGroupInstanceId(null)
+                .setClientId("client-id")
+                .setClientHost("/127.0.0.1")
+                .setMemberMetadata(Array(1, 2, 3))
+            ).asJava),
+          new DescribedGroup()
+            .setGroupId("grp-2")
+            .setGroupState(GenericGroupState.COMPLETING_REBALANCE.toString)
+            .setProtocolType("consumer")
+            .setMembers(List(
+              new DescribedGroupMember()
+                .setMemberId(memberId2)
+                .setGroupInstanceId(null)
+                .setClientId("client-id")
+                .setClientHost("/127.0.0.1")
+            ).asJava),
+          new DescribedGroup()

Review Comment:
   nit: Should we add a comment for this case? It is obvious that we should 
return a Dead group when the group does not exist.



##########
core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala:
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import 
org.apache.kafka.common.message.DescribeGroupsResponseData.{DescribedGroup, 
DescribedGroupMember}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+import scala.jdk.CollectionConverters._
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class DescribeGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDescribeGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testDescribeGroups()
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDescribeGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+    testDescribeGroups()
+  }
+
+  private def testDescribeGroups(): Unit = {
+

Review Comment:
   nit: We usually don't put an empty line after the method declaration.



##########
core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala:
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import 
org.apache.kafka.common.message.DescribeGroupsResponseData.{DescribedGroup, 
DescribedGroupMember}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+import scala.jdk.CollectionConverters._
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class DescribeGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDescribeGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testDescribeGroups()
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala:
##########
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.message.ListGroupsResponseData
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import 
org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class ListGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "600000"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "600000"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+    testListGroupsWithNewProtocol()
+  }
+
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+    testListGroupsWithOldProtocol()
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit 
= {
+    testListGroupsWithOldProtocol()
+  }
+
+  private def testListGroupsWithNewProtocol(): Unit = {
+    if (!isNewGroupCoordinatorEnabled) {
+      fail("Cannot use the new protocol with the old group coordinator.")
+    }
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    // Create the topic.
+    createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    for (version <- ApiKeys.LIST_GROUPS.oldestVersion() to 
ApiKeys.LIST_GROUPS.latestVersion(isUnstableApiEnabled)) {
+

Review Comment:
   nit: empty line.



##########
core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala:
##########
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.message.ListGroupsResponseData
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import 
org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class ListGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "600000"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "600000"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+    testListGroupsWithNewProtocol()
+  }
+
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+    testListGroupsWithOldProtocol()
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit 
= {
+    testListGroupsWithOldProtocol()
+  }
+
+  private def testListGroupsWithNewProtocol(): Unit = {
+    if (!isNewGroupCoordinatorEnabled) {
+      fail("Cannot use the new protocol with the old group coordinator.")
+    }
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    // Create the topic.
+    createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    for (version <- ApiKeys.LIST_GROUPS.oldestVersion() to 
ApiKeys.LIST_GROUPS.latestVersion(isUnstableApiEnabled)) {
+
+      // Join the consumer group. Note that we don't heartbeat here so we must 
use
+      // a session long enough for the duration of the test.
+      val (memberId1, _) = joinConsumerGroup("grp", true)
+
+      checkListedGroups(
+        groupId = "grp",
+        state = ConsumerGroupState.STABLE.toString,
+        statesFilterExpectingEmptyListedGroups = 
List(ConsumerGroupState.ASSIGNING.toString),
+        version = version
+      )
+
+      // Member 2 joins the group, triggering a rebalance.
+      val (memberId2, _) = joinConsumerGroup("grp", true)
+
+      checkListedGroups(
+        groupId = "grp",
+        state = ConsumerGroupState.RECONCILING.toString,
+        statesFilterExpectingEmptyListedGroups = 
List(ConsumerGroupState.STABLE.toString),
+        version = version
+      )
+
+      leaveGroup(
+        groupId = "grp",
+        memberId = memberId1,
+        useNewProtocol = true
+      )
+      leaveGroup(
+        groupId = "grp",
+        memberId = memberId2,
+        useNewProtocol = true
+      )
+
+      checkListedGroups(
+        groupId = "grp",
+        state = ConsumerGroupState.EMPTY.toString,
+        statesFilterExpectingEmptyListedGroups = 
List(ConsumerGroupState.RECONCILING.toString),
+        version = version
+      )

Review Comment:
   I find this construct a bit weird because we mix two things into one. It may 
be better to separate. One way would be to list group and assert the returned 
list.



##########
core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala:
##########
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.message.ListGroupsResponseData
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import 
org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class ListGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "600000"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "600000"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+    testListGroupsWithNewProtocol()
+  }
+
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+    testListGroupsWithOldProtocol()
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit 
= {
+    testListGroupsWithOldProtocol()
+  }
+
+  private def testListGroupsWithNewProtocol(): Unit = {
+    if (!isNewGroupCoordinatorEnabled) {
+      fail("Cannot use the new protocol with the old group coordinator.")
+    }
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    // Create the topic.
+    createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    for (version <- ApiKeys.LIST_GROUPS.oldestVersion() to 
ApiKeys.LIST_GROUPS.latestVersion(isUnstableApiEnabled)) {
+
+      // Join the consumer group. Note that we don't heartbeat here so we must 
use
+      // a session long enough for the duration of the test.
+      val (memberId1, _) = joinConsumerGroup("grp", true)
+
+      checkListedGroups(
+        groupId = "grp",
+        state = ConsumerGroupState.STABLE.toString,
+        statesFilterExpectingEmptyListedGroups = 
List(ConsumerGroupState.ASSIGNING.toString),
+        version = version
+      )
+
+      // Member 2 joins the group, triggering a rebalance.
+      val (memberId2, _) = joinConsumerGroup("grp", true)
+
+      checkListedGroups(
+        groupId = "grp",
+        state = ConsumerGroupState.RECONCILING.toString,
+        statesFilterExpectingEmptyListedGroups = 
List(ConsumerGroupState.STABLE.toString),
+        version = version
+      )
+
+      leaveGroup(
+        groupId = "grp",
+        memberId = memberId1,
+        useNewProtocol = true
+      )
+      leaveGroup(
+        groupId = "grp",
+        memberId = memberId2,
+        useNewProtocol = true
+      )
+
+      checkListedGroups(
+        groupId = "grp",
+        state = ConsumerGroupState.EMPTY.toString,
+        statesFilterExpectingEmptyListedGroups = 
List(ConsumerGroupState.RECONCILING.toString),
+        version = version
+      )
+
+      deleteGroups(
+        groupIds = List("grp"),
+        expectedErrors = List(Errors.NONE),
+        version = ApiKeys.DELETE_GROUPS.latestVersion(isUnstableApiEnabled)
+      )
+
+      assertEquals(
+        List.empty,
+        listGroups(
+          statesFilter = List.empty,
+          version = version.toShort
+        )
+      )
+    }
+  }
+
+  private def testListGroupsWithOldProtocol(): Unit = {
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    // Create the topic.
+    createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    for (version <- ApiKeys.LIST_GROUPS.oldestVersion() to 
ApiKeys.LIST_GROUPS.latestVersion(isUnstableApiEnabled)) {
+

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala:
##########
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.junit.jupiter.api.Assertions.fail
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class OffsetDeleteRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "600000"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "600000"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testOffsetDeleteWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testOffsetDelete(true)
+  }
+
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testOffsetDeleteWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testOffsetDelete(false)
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testOffsetDeleteWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+    testOffsetDelete(false)
+  }
+
+  private def testOffsetDelete(useNewProtocol: Boolean): Unit = {
+    if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
+      fail("Cannot use the new protocol with the old group coordinator.")
+    }
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    // Create the topic.
+    createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    for (version <- ApiKeys.OFFSET_DELETE.oldestVersion() to 
ApiKeys.OFFSET_DELETE.latestVersion(isUnstableApiEnabled)) {
+

Review Comment:
   nit: empty line.



##########
core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala:
##########
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.junit.jupiter.api.Assertions.fail
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class OffsetDeleteRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "600000"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "600000"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testOffsetDeleteWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testOffsetDelete(true)
+  }
+
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testOffsetDeleteWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testOffsetDelete(false)
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testOffsetDeleteWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+    testOffsetDelete(false)
+  }
+
+  private def testOffsetDelete(useNewProtocol: Boolean): Unit = {
+    if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
+      fail("Cannot use the new protocol with the old group coordinator.")
+    }
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    // Create the topic.
+    createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    for (version <- ApiKeys.OFFSET_DELETE.oldestVersion() to 
ApiKeys.OFFSET_DELETE.latestVersion(isUnstableApiEnabled)) {
+
+      // Join the consumer group. Note that we don't heartbeat here so we must 
use
+      // a session long enough for the duration of the test.
+      val (memberId, memberEpoch) = joinConsumerGroup(
+        groupId = "grp",
+        useNewProtocol = useNewProtocol
+      )
+
+      // Commit offsets.
+      for (partitionId <- 0 to 2) {
+        commitOffset(
+          groupId = "grp",
+          memberId = memberId,
+          memberEpoch = memberEpoch,
+          topic = "foo",
+          partition = partitionId,
+          offset = 100L + partitionId,
+          expectedError = Errors.NONE,
+          version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
+        )
+      }
+
+      // Delete offset with topic that the group is subscribed to.
+      deleteOffset(
+        groupId = "grp",
+        topic = "foo",
+        partition = 0,
+        expectedPartitionError = Errors.GROUP_SUBSCRIBED_TO_TOPIC,
+        version = version.toShort
+      )
+
+      // Unsubscribe the topic.
+      if (useNewProtocol) {
+        consumerGroupHeartbeat(
+          groupId = "grp",
+          memberId = memberId,
+          memberEpoch = memberEpoch,
+          subscribedTopicNames = List.empty
+        )
+      } else {
+        leaveGroup(
+          groupId = "grp",
+          memberId = memberId,
+          useNewProtocol = false
+        )
+      }
+
+      // Delete offsets.
+      for (partitionId <- 0 to 1) {
+        deleteOffset(
+          groupId = "grp",
+          topic = "foo",
+          partition = partitionId,
+          version = version.toShort
+        )
+      }
+
+      // Delete offsets with partition that doesn't exist.
+      deleteOffset(
+        groupId = "grp",
+        topic = "foo",
+        partition = 5,
+        expectedPartitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION,
+        version = version.toShort
+      )
+
+      // Delete offset with unknown group id.
+      deleteOffset(

Review Comment:
   Could we also add the case that led to 
https://github.com/apache/kafka/pull/14589? Could we also update the 
OffsetFetch integration tests while we are here?



##########
core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala:
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import 
org.apache.kafka.common.message.DescribeGroupsResponseData.{DescribedGroup, 
DescribedGroupMember}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+import scala.jdk.CollectionConverters._
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class DescribeGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDescribeGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testDescribeGroups()
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDescribeGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+    testDescribeGroups()
+  }
+
+  private def testDescribeGroups(): Unit = {
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    // Create the topic.
+    createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    // Join the consumer group. Note that we don't heartbeat here so we must 
use
+    // a session long enough for the duration of the test.
+    val (memberId1, _) = joinConsumerGroupWithOldProtocol(
+      groupId = "grp-1",
+      metadata = Array(1, 2, 3)
+    )
+    val (memberId2, _) = joinConsumerGroupWithOldProtocol(
+      groupId = "grp-2",
+      metadata = Array(1, 2, 3),
+      completeRebalance = false
+    )
+
+    for (version <- ApiKeys.DESCRIBE_GROUPS.oldestVersion() to 
ApiKeys.DESCRIBE_GROUPS.latestVersion(isUnstableApiEnabled)) {
+      assertEquals(
+        List(
+          new DescribedGroup()
+            .setGroupId("grp-1")
+            .setGroupState(GenericGroupState.STABLE.toString)
+            .setProtocolType("consumer")
+            .setProtocolData("consumer-range")
+            .setMembers(List(
+              new DescribedGroupMember()
+                .setMemberId(memberId1)
+                .setGroupInstanceId(null)
+                .setClientId("client-id")
+                .setClientHost("/127.0.0.1")
+                .setMemberMetadata(Array(1, 2, 3))

Review Comment:
   Could we also verify memberAssignment?



##########
core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala:
##########
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import 
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class DeleteGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "600000"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "600000"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDeleteGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testDeleteGroups(true)
+  }
+
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDeleteGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testDeleteGroups(false)
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDeleteGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+    testDeleteGroups(false)
+  }
+
+  private def testDeleteGroups(useNewProtocol: Boolean): Unit = {
+    if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
+      fail("Cannot use the new protocol with the old group coordinator.")
+    }
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    // Create the topic.
+    createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    // Join the consumer group. Note that we don't heartbeat here so we must 
use
+    // a session long enough for the duration of the test.
+    // We test DeleteGroups on empty and non-empty groups. Here we create the 
non-empty group.
+    joinConsumerGroup(
+      groupId = "grp-non-empty",
+      useNewProtocol = useNewProtocol
+    )
+
+    for (version <- ApiKeys.DELETE_GROUPS.oldestVersion() to 
ApiKeys.DELETE_GROUPS.latestVersion(isUnstableApiEnabled)) {
+
+      // Join the consumer group. Note that we don't heartbeat here so we must 
use
+      // a session long enough for the duration of the test.
+      val (memberId, memberEpoch) = joinConsumerGroup(
+        groupId = "grp",
+        useNewProtocol = useNewProtocol
+      )
+      leaveGroup(

Review Comment:
   nit: Let's add an empty line and a comment explaining why we need to leave 
the group.



##########
core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala:
##########
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import 
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class DeleteGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "600000"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "600000"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDeleteGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testDeleteGroups(true)
+  }
+
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDeleteGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testDeleteGroups(false)
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDeleteGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+    testDeleteGroups(false)
+  }
+
+  private def testDeleteGroups(useNewProtocol: Boolean): Unit = {
+    if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
+      fail("Cannot use the new protocol with the old group coordinator.")
+    }
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    // Create the topic.
+    createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    // Join the consumer group. Note that we don't heartbeat here so we must 
use
+    // a session long enough for the duration of the test.
+    // We test DeleteGroups on empty and non-empty groups. Here we create the 
non-empty group.
+    joinConsumerGroup(
+      groupId = "grp-non-empty",
+      useNewProtocol = useNewProtocol
+    )
+
+    for (version <- ApiKeys.DELETE_GROUPS.oldestVersion() to 
ApiKeys.DELETE_GROUPS.latestVersion(isUnstableApiEnabled)) {
+

Review Comment:
   nit: We could remove this empty line.



##########
core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala:
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import 
org.apache.kafka.common.message.DescribeGroupsResponseData.{DescribedGroup, 
DescribedGroupMember}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+import scala.jdk.CollectionConverters._
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class DescribeGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDescribeGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testDescribeGroups()
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDescribeGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+    testDescribeGroups()
+  }
+
+  private def testDescribeGroups(): Unit = {
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    // Create the topic.
+    createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    // Join the consumer group. Note that we don't heartbeat here so we must 
use
+    // a session long enough for the duration of the test.
+    val (memberId1, _) = joinConsumerGroupWithOldProtocol(
+      groupId = "grp-1",
+      metadata = Array(1, 2, 3)
+    )
+    val (memberId2, _) = joinConsumerGroupWithOldProtocol(
+      groupId = "grp-2",
+      metadata = Array(1, 2, 3),
+      completeRebalance = false
+    )
+
+    for (version <- ApiKeys.DESCRIBE_GROUPS.oldestVersion() to 
ApiKeys.DESCRIBE_GROUPS.latestVersion(isUnstableApiEnabled)) {
+      assertEquals(
+        List(
+          new DescribedGroup()
+            .setGroupId("grp-1")
+            .setGroupState(GenericGroupState.STABLE.toString)
+            .setProtocolType("consumer")
+            .setProtocolData("consumer-range")
+            .setMembers(List(
+              new DescribedGroupMember()
+                .setMemberId(memberId1)
+                .setGroupInstanceId(null)
+                .setClientId("client-id")
+                .setClientHost("/127.0.0.1")
+                .setMemberMetadata(Array(1, 2, 3))
+            ).asJava),
+          new DescribedGroup()
+            .setGroupId("grp-2")
+            .setGroupState(GenericGroupState.COMPLETING_REBALANCE.toString)
+            .setProtocolType("consumer")
+            .setMembers(List(
+              new DescribedGroupMember()
+                .setMemberId(memberId2)
+                .setGroupInstanceId(null)
+                .setClientId("client-id")
+                .setClientHost("/127.0.0.1")

Review Comment:
   Should we assert both member metadata and assignment?



##########
core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala:
##########
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import 
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class DeleteGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "600000"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "600000"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDeleteGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testDeleteGroups(true)
+  }
+
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDeleteGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testDeleteGroups(false)
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDeleteGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+    testDeleteGroups(false)
+  }
+
+  private def testDeleteGroups(useNewProtocol: Boolean): Unit = {
+    if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
+      fail("Cannot use the new protocol with the old group coordinator.")
+    }
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    // Create the topic.
+    createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    // Join the consumer group. Note that we don't heartbeat here so we must 
use
+    // a session long enough for the duration of the test.
+    // We test DeleteGroups on empty and non-empty groups. Here we create the 
non-empty group.
+    joinConsumerGroup(
+      groupId = "grp-non-empty",
+      useNewProtocol = useNewProtocol
+    )
+
+    for (version <- ApiKeys.DELETE_GROUPS.oldestVersion() to 
ApiKeys.DELETE_GROUPS.latestVersion(isUnstableApiEnabled)) {
+
+      // Join the consumer group. Note that we don't heartbeat here so we must 
use
+      // a session long enough for the duration of the test.
+      val (memberId, memberEpoch) = joinConsumerGroup(
+        groupId = "grp",
+        useNewProtocol = useNewProtocol
+      )
+      leaveGroup(
+        groupId = "grp",
+        memberId = memberId,
+        useNewProtocol = useNewProtocol
+      )
+
+      deleteGroups(
+        groupIds = List("grp-non-empty", "grp"),
+        expectedErrors = List(Errors.NON_EMPTY_GROUP, Errors.NONE),
+        version = version.toShort
+      )
+
+      if (useNewProtocol) {
+        commitOffset(
+          groupId = "grp",
+          memberId = memberId,
+          memberEpoch = memberEpoch,
+          topic = "foo",
+          partition = 0,
+          offset = 100L,
+          expectedError = Errors.GROUP_ID_NOT_FOUND,
+          version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
+        )
+      } else {
+        assertEquals(
+          List(new DescribedGroup()
+            .setGroupId("grp")
+            .setGroupState(GenericGroupState.DEAD.toString)
+          ),
+          describeGroups(
+            groupIds = List("grp"),
+            version = version.toShort

Review Comment:
   Using `version.toShort` seems incorrect here, isn't it?



##########
core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala:
##########
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import 
org.apache.kafka.common.message.DescribeGroupsResponseData.{DescribedGroup, 
DescribedGroupMember}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+import scala.jdk.CollectionConverters._
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class DescribeGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDescribeGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testDescribeGroups()
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testDescribeGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+    testDescribeGroups()
+  }
+
+  private def testDescribeGroups(): Unit = {
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    // Create the topic.
+    createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    // Join the consumer group. Note that we don't heartbeat here so we must 
use
+    // a session long enough for the duration of the test.
+    val (memberId1, _) = joinConsumerGroupWithOldProtocol(
+      groupId = "grp-1",
+      metadata = Array(1, 2, 3)
+    )
+    val (memberId2, _) = joinConsumerGroupWithOldProtocol(
+      groupId = "grp-2",
+      metadata = Array(1, 2, 3),
+      completeRebalance = false
+    )

Review Comment:
   nit: Let's update the first comment and add one for the second member to 
explain why we don't complete the rebalance.



##########
core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala:
##########
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.message.ListGroupsResponseData
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import 
org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class ListGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "600000"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "600000"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+    testListGroupsWithNewProtocol()
+  }
+
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+    testListGroupsWithOldProtocol()
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit 
= {
+    testListGroupsWithOldProtocol()
+  }
+
+  private def testListGroupsWithNewProtocol(): Unit = {
+    if (!isNewGroupCoordinatorEnabled) {
+      fail("Cannot use the new protocol with the old group coordinator.")
+    }
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    // Create the topic.
+    createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    for (version <- ApiKeys.LIST_GROUPS.oldestVersion() to 
ApiKeys.LIST_GROUPS.latestVersion(isUnstableApiEnabled)) {
+
+      // Join the consumer group. Note that we don't heartbeat here so we must 
use
+      // a session long enough for the duration of the test.
+      val (memberId1, _) = joinConsumerGroup("grp", true)
+
+      checkListedGroups(
+        groupId = "grp",
+        state = ConsumerGroupState.STABLE.toString,
+        statesFilterExpectingEmptyListedGroups = 
List(ConsumerGroupState.ASSIGNING.toString),
+        version = version
+      )
+
+      // Member 2 joins the group, triggering a rebalance.
+      val (memberId2, _) = joinConsumerGroup("grp", true)
+
+      checkListedGroups(
+        groupId = "grp",
+        state = ConsumerGroupState.RECONCILING.toString,
+        statesFilterExpectingEmptyListedGroups = 
List(ConsumerGroupState.STABLE.toString),
+        version = version
+      )
+
+      leaveGroup(
+        groupId = "grp",
+        memberId = memberId1,
+        useNewProtocol = true
+      )
+      leaveGroup(
+        groupId = "grp",
+        memberId = memberId2,
+        useNewProtocol = true
+      )
+
+      checkListedGroups(
+        groupId = "grp",
+        state = ConsumerGroupState.EMPTY.toString,
+        statesFilterExpectingEmptyListedGroups = 
List(ConsumerGroupState.RECONCILING.toString),
+        version = version
+      )
+
+      deleteGroups(
+        groupIds = List("grp"),
+        expectedErrors = List(Errors.NONE),
+        version = ApiKeys.DELETE_GROUPS.latestVersion(isUnstableApiEnabled)
+      )
+
+      assertEquals(
+        List.empty,
+        listGroups(
+          statesFilter = List.empty,
+          version = version.toShort
+        )
+      )
+    }
+  }

Review Comment:
   I wonder if we should have both old and new group when we test the new group 
coordinator. What do you think?



##########
core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala:
##########
@@ -220,31 +292,92 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
     joinGroupResponse = connectAndReceive[JoinGroupResponse](joinGroupRequest)
     assertEquals(Errors.NONE.code, joinGroupResponse.data.errorCode)
 
-    val syncGroupRequestData = new SyncGroupRequestData()
-      .setGroupId(groupId)
-      .setMemberId(joinGroupResponse.data.memberId)
-      .setGenerationId(joinGroupResponse.data.generationId)
-      .setProtocolType("consumer")
-      .setProtocolName("consumer-range")
-      .setAssignments(List.empty.asJava)
-
-    // Send the sync group request to complete the rebalance.
-    val syncGroupRequest = new 
SyncGroupRequest.Builder(syncGroupRequestData).build()
-    val syncGroupResponse = 
connectAndReceive[SyncGroupResponse](syncGroupRequest)
-    assertEquals(Errors.NONE.code, syncGroupResponse.data.errorCode)
+    if (completeRebalance) {
+      // Send the sync group request to complete the rebalance.
+      syncGroupWithOldProtocol(
+        groupId = groupId,
+        memberId = joinGroupResponse.data.memberId(),
+        generationId = joinGroupResponse.data.generationId()
+      )
+    }
 
     (joinGroupResponse.data.memberId, joinGroupResponse.data.generationId)
   }
 
   protected def joinConsumerGroupWithNewProtocol(groupId: String): (String, 
Int) = {
-    // Heartbeat request to join the group.
+    val consumerGroupHeartbeatResponseData = consumerGroupHeartbeat(
+      groupId = groupId,
+      rebalanceTimeoutMs = 5 * 60 * 1000,
+      subscribedTopicNames = List("foo"),
+      topicPartitions = List.empty
+    )
+    (consumerGroupHeartbeatResponseData.memberId, 
consumerGroupHeartbeatResponseData.memberEpoch)
+  }
+
+  protected def joinConsumerGroup(groupId: String, useNewProtocol: Boolean): 
(String, Int) = {
+    if (useNewProtocol) {
+      // Note that we heartbeat only once to join the group and assume
+      // that the test will complete within the session timeout.
+      joinConsumerGroupWithNewProtocol(groupId)
+    } else {
+      // Note that we don't heartbeat and assume that the test will
+      // complete within the session timeout.
+      joinConsumerGroupWithOldProtocol(groupId)
+    }
+  }
+
+  protected def listGroups(
+    statesFilter: List[String],
+    version: Short
+  ): List[ListGroupsResponseData.ListedGroup] = {
+    val request = new ListGroupsRequest.Builder(
+      new ListGroupsRequestData()
+        .setStatesFilter(statesFilter.asJava)
+    ).build(version)
+
+    val response = connectAndReceive[ListGroupsResponse](request)
+
+    response.data().groups.asScala.toList
+  }
+
+  protected def describeGroups(
+    groupIds: List[String],
+    version: Short
+  ): List[DescribeGroupsResponseData.DescribedGroup] = {
+    val describeGroupsRequest = new DescribeGroupsRequest.Builder(
+      new DescribeGroupsRequestData()
+        .setGroups(groupIds.asJava)
+    ).build(version)
+
+    val describeGroupsResponse = 
connectAndReceive[DescribeGroupsResponse](describeGroupsRequest)
+
+    describeGroupsResponse.data().groups().asScala.toList

Review Comment:
   nit: In Scala, we don't have to put `()` for getters so we usually don't put 
them in our code base. Here you can use 
`describeGroupsResponse.data.groups.asScala.toList`.
   
   There are other cases where you could remove them. Could you please look at 
them?



##########
core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala:
##########
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.message.ListGroupsResponseData
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import 
org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class ListGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "600000"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "600000"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+    testListGroupsWithNewProtocol()
+  }
+
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+    testListGroupsWithOldProtocol()
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit 
= {
+    testListGroupsWithOldProtocol()
+  }
+
+  private def testListGroupsWithNewProtocol(): Unit = {
+    if (!isNewGroupCoordinatorEnabled) {
+      fail("Cannot use the new protocol with the old group coordinator.")
+    }
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    // Create the topic.
+    createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    for (version <- ApiKeys.LIST_GROUPS.oldestVersion() to 
ApiKeys.LIST_GROUPS.latestVersion(isUnstableApiEnabled)) {
+
+      // Join the consumer group. Note that we don't heartbeat here so we must 
use
+      // a session long enough for the duration of the test.
+      val (memberId1, _) = joinConsumerGroup("grp", true)
+
+      checkListedGroups(
+        groupId = "grp",
+        state = ConsumerGroupState.STABLE.toString,
+        statesFilterExpectingEmptyListedGroups = 
List(ConsumerGroupState.ASSIGNING.toString),
+        version = version
+      )
+
+      // Member 2 joins the group, triggering a rebalance.
+      val (memberId2, _) = joinConsumerGroup("grp", true)
+
+      checkListedGroups(
+        groupId = "grp",
+        state = ConsumerGroupState.RECONCILING.toString,
+        statesFilterExpectingEmptyListedGroups = 
List(ConsumerGroupState.STABLE.toString),
+        version = version
+      )
+
+      leaveGroup(
+        groupId = "grp",
+        memberId = memberId1,
+        useNewProtocol = true
+      )
+      leaveGroup(
+        groupId = "grp",
+        memberId = memberId2,
+        useNewProtocol = true
+      )
+
+      checkListedGroups(
+        groupId = "grp",
+        state = ConsumerGroupState.EMPTY.toString,
+        statesFilterExpectingEmptyListedGroups = 
List(ConsumerGroupState.RECONCILING.toString),
+        version = version
+      )
+
+      deleteGroups(
+        groupIds = List("grp"),
+        expectedErrors = List(Errors.NONE),
+        version = ApiKeys.DELETE_GROUPS.latestVersion(isUnstableApiEnabled)
+      )
+
+      assertEquals(
+        List.empty,
+        listGroups(
+          statesFilter = List.empty,
+          version = version.toShort
+        )
+      )
+    }
+  }
+
+  private def testListGroupsWithOldProtocol(): Unit = {
+

Review Comment:
   nit: Empty line.



##########
core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala:
##########
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.junit.jupiter.api.Assertions.fail
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class OffsetDeleteRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "600000"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "600000"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testOffsetDeleteWithNewConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testOffsetDelete(true)
+  }
+
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testOffsetDeleteWithOldConsumerGroupProtocolAndNewGroupCoordinator(): 
Unit = {
+    testOffsetDelete(false)
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testOffsetDeleteWithOldConsumerGroupProtocolAndOldGroupCoordinator(): 
Unit = {
+    testOffsetDelete(false)
+  }
+
+  private def testOffsetDelete(useNewProtocol: Boolean): Unit = {
+    if (useNewProtocol && !isNewGroupCoordinatorEnabled) {
+      fail("Cannot use the new protocol with the old group coordinator.")
+    }
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    // Create the topic.
+    createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    for (version <- ApiKeys.OFFSET_DELETE.oldestVersion() to 
ApiKeys.OFFSET_DELETE.latestVersion(isUnstableApiEnabled)) {
+
+      // Join the consumer group. Note that we don't heartbeat here so we must 
use
+      // a session long enough for the duration of the test.
+      val (memberId, memberEpoch) = joinConsumerGroup(
+        groupId = "grp",
+        useNewProtocol = useNewProtocol
+      )
+
+      // Commit offsets.
+      for (partitionId <- 0 to 2) {
+        commitOffset(
+          groupId = "grp",
+          memberId = memberId,
+          memberEpoch = memberEpoch,
+          topic = "foo",
+          partition = partitionId,
+          offset = 100L + partitionId,
+          expectedError = Errors.NONE,
+          version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled)
+        )
+      }
+
+      // Delete offset with topic that the group is subscribed to.
+      deleteOffset(
+        groupId = "grp",
+        topic = "foo",
+        partition = 0,
+        expectedPartitionError = Errors.GROUP_SUBSCRIBED_TO_TOPIC,
+        version = version.toShort
+      )
+
+      // Unsubscribe the topic.
+      if (useNewProtocol) {
+        consumerGroupHeartbeat(
+          groupId = "grp",
+          memberId = memberId,
+          memberEpoch = memberEpoch,
+          subscribedTopicNames = List.empty
+        )
+      } else {
+        leaveGroup(
+          groupId = "grp",
+          memberId = memberId,
+          useNewProtocol = false
+        )

Review Comment:
   I wonder we could also test the "unsubscribe" case for the old protocol and 
the leave group case for the new one for completeness.



##########
core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala:
##########
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import org.apache.kafka.common.message.ListGroupsResponseData
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import 
org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState
+import org.apache.kafka.coordinator.group.generic.GenericGroupState
+import org.junit.jupiter.api.Assertions.{assertEquals, fail}
+import org.junit.jupiter.api.{Tag, Timeout}
+import org.junit.jupiter.api.extension.ExtendWith
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)
+@Tag("integration")
+class ListGroupsRequestTest(cluster: ClusterInstance) extends 
GroupCoordinatorBaseRequestTest(cluster) {
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", 
value = "600000"),
+    new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value 
= "600000"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+    testListGroupsWithNewProtocol()
+  }
+
+  @ClusterTest(serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit 
= {
+    testListGroupsWithOldProtocol()
+  }
+
+  @ClusterTest(clusterType = Type.ALL, serverProperties = Array(
+    new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"false"),
+    new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+    new ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1")
+  ))
+  def testListGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit 
= {
+    testListGroupsWithOldProtocol()
+  }
+
+  private def testListGroupsWithNewProtocol(): Unit = {
+    if (!isNewGroupCoordinatorEnabled) {
+      fail("Cannot use the new protocol with the old group coordinator.")
+    }
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    createOffsetsTopic()
+
+    // Create the topic.
+    createTopic(
+      topic = "foo",
+      numPartitions = 3
+    )
+
+    for (version <- ApiKeys.LIST_GROUPS.oldestVersion() to 
ApiKeys.LIST_GROUPS.latestVersion(isUnstableApiEnabled)) {
+
+      // Join the consumer group. Note that we don't heartbeat here so we must 
use
+      // a session long enough for the duration of the test.
+      val (memberId1, _) = joinConsumerGroup("grp", true)
+
+      checkListedGroups(
+        groupId = "grp",
+        state = ConsumerGroupState.STABLE.toString,
+        statesFilterExpectingEmptyListedGroups = 
List(ConsumerGroupState.ASSIGNING.toString),
+        version = version
+      )
+
+      // Member 2 joins the group, triggering a rebalance.
+      val (memberId2, _) = joinConsumerGroup("grp", true)
+
+      checkListedGroups(
+        groupId = "grp",
+        state = ConsumerGroupState.RECONCILING.toString,
+        statesFilterExpectingEmptyListedGroups = 
List(ConsumerGroupState.STABLE.toString),
+        version = version
+      )
+
+      leaveGroup(
+        groupId = "grp",
+        memberId = memberId1,
+        useNewProtocol = true
+      )
+      leaveGroup(
+        groupId = "grp",
+        memberId = memberId2,
+        useNewProtocol = true
+      )
+
+      checkListedGroups(
+        groupId = "grp",
+        state = ConsumerGroupState.EMPTY.toString,
+        statesFilterExpectingEmptyListedGroups = 
List(ConsumerGroupState.RECONCILING.toString),
+        version = version
+      )
+
+      deleteGroups(
+        groupIds = List("grp"),
+        expectedErrors = List(Errors.NONE),
+        version = ApiKeys.DELETE_GROUPS.latestVersion(isUnstableApiEnabled)
+      )
+
+      assertEquals(
+        List.empty,
+        listGroups(
+          statesFilter = List.empty,

Review Comment:
   Should we also have a test with multiple states? It seems that we only do 
empty or one.



##########
core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala:
##########
@@ -181,6 +184,52 @@ class GroupCoordinatorBaseRequestTest(cluster: 
ClusterInstance) {
     response.data.groups.asScala.toList
   }
 
+  protected def deleteOffset(
+    groupId: String,
+    topic: String,
+    partition: Int,
+    expectedResponseError: Errors = Errors.NONE,
+    expectedPartitionError: Errors = Errors.NONE,
+    version: Short
+  ): Unit = {
+    if (expectedResponseError != Errors.NONE && expectedPartitionError != 
Errors.NONE) {
+      fail("deleteOffset: neither expectedResponseError nor expectedTopicError 
is Errors.NONE.")
+    }
+
+    val request = new OffsetDeleteRequest.Builder(
+      new OffsetDeleteRequestData()
+        .setGroupId(groupId)
+        .setTopics(new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(List(
+          new OffsetDeleteRequestData.OffsetDeleteRequestTopic()
+            .setName(topic)
+            .setPartitions(List(
+              new OffsetDeleteRequestData.OffsetDeleteRequestPartition()
+                .setPartitionIndex(partition)
+            ).asJava)
+        ).asJava.iterator()))
+    ).build(version)
+
+    val expectedResponse = new OffsetDeleteResponseData()
+    if (expectedResponseError == Errors.NONE) {
+      expectedResponse
+        .setTopics(new 
OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(List(
+          new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+            .setName(topic)
+            .setPartitions(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(List(
+              new OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+                .setPartitionIndex(partition)
+                .setErrorCode(expectedPartitionError.code())
+            ).asJava.iterator()))
+        ).asJava.iterator()))
+    } else {
+      expectedResponse
+        .setErrorCode(expectedResponseError.code())

Review Comment:
   nit: We could bring back `setErrorCode` on the previous line as there is 
space for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to