dajac commented on code in PR #14537: URL: https://github.com/apache/kafka/pull/14537#discussion_r1371838869
########## core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala: ########## @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.message.DescribeGroupsResponseData.{DescribedGroup, DescribedGroupMember} +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +import scala.jdk.CollectionConverters._ + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), Review Comment: I suppose that we don't need this in this case. ########## core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala: ########## @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.message.DescribeGroupsResponseData.{DescribedGroup, DescribedGroupMember} +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +import scala.jdk.CollectionConverters._ + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDescribeGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testDescribeGroups() + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDescribeGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testDescribeGroups() + } + + private def testDescribeGroups(): Unit = { + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + // Create the topic. + createTopic( + topic = "foo", + numPartitions = 3 + ) + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId1, _) = joinConsumerGroupWithOldProtocol( + groupId = "grp-1", + metadata = Array(1, 2, 3) + ) + val (memberId2, _) = joinConsumerGroupWithOldProtocol( + groupId = "grp-2", + metadata = Array(1, 2, 3), + completeRebalance = false + ) + + for (version <- ApiKeys.DESCRIBE_GROUPS.oldestVersion() to ApiKeys.DESCRIBE_GROUPS.latestVersion(isUnstableApiEnabled)) { + assertEquals( + List( + new DescribedGroup() + .setGroupId("grp-1") + .setGroupState(GenericGroupState.STABLE.toString) + .setProtocolType("consumer") + .setProtocolData("consumer-range") + .setMembers(List( + new DescribedGroupMember() + .setMemberId(memberId1) + .setGroupInstanceId(null) + .setClientId("client-id") + .setClientHost("/127.0.0.1") + .setMemberMetadata(Array(1, 2, 3)) + ).asJava), + new DescribedGroup() + .setGroupId("grp-2") + .setGroupState(GenericGroupState.COMPLETING_REBALANCE.toString) + .setProtocolType("consumer") + .setMembers(List( + new DescribedGroupMember() + .setMemberId(memberId2) + .setGroupInstanceId(null) + .setClientId("client-id") + .setClientHost("/127.0.0.1") + ).asJava), + new DescribedGroup() Review Comment: nit: Should we add a comment for this case? It is obvious that we should return a Dead group when the group does not exist. ########## core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala: ########## @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.message.DescribeGroupsResponseData.{DescribedGroup, DescribedGroupMember} +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +import scala.jdk.CollectionConverters._ + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDescribeGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testDescribeGroups() + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDescribeGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testDescribeGroups() + } + + private def testDescribeGroups(): Unit = { + Review Comment: nit: We usually don't put an empty line after the method declaration. ########## core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala: ########## @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.message.DescribeGroupsResponseData.{DescribedGroup, DescribedGroupMember} +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +import scala.jdk.CollectionConverters._ + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDescribeGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testDescribeGroups() + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), Review Comment: ditto. ########## core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala: ########## @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.message.ListGroupsResponseData +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testListGroupsWithNewProtocol() + } + + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testListGroupsWithOldProtocol() + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testListGroupsWithOldProtocol() + } + + private def testListGroupsWithNewProtocol(): Unit = { + if (!isNewGroupCoordinatorEnabled) { + fail("Cannot use the new protocol with the old group coordinator.") + } + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + // Create the topic. + createTopic( + topic = "foo", + numPartitions = 3 + ) + + for (version <- ApiKeys.LIST_GROUPS.oldestVersion() to ApiKeys.LIST_GROUPS.latestVersion(isUnstableApiEnabled)) { + Review Comment: nit: empty line. ########## core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala: ########## @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.message.ListGroupsResponseData +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testListGroupsWithNewProtocol() + } + + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testListGroupsWithOldProtocol() + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testListGroupsWithOldProtocol() + } + + private def testListGroupsWithNewProtocol(): Unit = { + if (!isNewGroupCoordinatorEnabled) { + fail("Cannot use the new protocol with the old group coordinator.") + } + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + // Create the topic. + createTopic( + topic = "foo", + numPartitions = 3 + ) + + for (version <- ApiKeys.LIST_GROUPS.oldestVersion() to ApiKeys.LIST_GROUPS.latestVersion(isUnstableApiEnabled)) { + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId1, _) = joinConsumerGroup("grp", true) + + checkListedGroups( + groupId = "grp", + state = ConsumerGroupState.STABLE.toString, + statesFilterExpectingEmptyListedGroups = List(ConsumerGroupState.ASSIGNING.toString), + version = version + ) + + // Member 2 joins the group, triggering a rebalance. + val (memberId2, _) = joinConsumerGroup("grp", true) + + checkListedGroups( + groupId = "grp", + state = ConsumerGroupState.RECONCILING.toString, + statesFilterExpectingEmptyListedGroups = List(ConsumerGroupState.STABLE.toString), + version = version + ) + + leaveGroup( + groupId = "grp", + memberId = memberId1, + useNewProtocol = true + ) + leaveGroup( + groupId = "grp", + memberId = memberId2, + useNewProtocol = true + ) + + checkListedGroups( + groupId = "grp", + state = ConsumerGroupState.EMPTY.toString, + statesFilterExpectingEmptyListedGroups = List(ConsumerGroupState.RECONCILING.toString), + version = version + ) Review Comment: I find this construct a bit weird because we mix two things into one. It may be better to separate. One way would be to list group and assert the returned list. ########## core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala: ########## @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.message.ListGroupsResponseData +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testListGroupsWithNewProtocol() + } + + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testListGroupsWithOldProtocol() + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testListGroupsWithOldProtocol() + } + + private def testListGroupsWithNewProtocol(): Unit = { + if (!isNewGroupCoordinatorEnabled) { + fail("Cannot use the new protocol with the old group coordinator.") + } + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + // Create the topic. + createTopic( + topic = "foo", + numPartitions = 3 + ) + + for (version <- ApiKeys.LIST_GROUPS.oldestVersion() to ApiKeys.LIST_GROUPS.latestVersion(isUnstableApiEnabled)) { + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId1, _) = joinConsumerGroup("grp", true) + + checkListedGroups( + groupId = "grp", + state = ConsumerGroupState.STABLE.toString, + statesFilterExpectingEmptyListedGroups = List(ConsumerGroupState.ASSIGNING.toString), + version = version + ) + + // Member 2 joins the group, triggering a rebalance. + val (memberId2, _) = joinConsumerGroup("grp", true) + + checkListedGroups( + groupId = "grp", + state = ConsumerGroupState.RECONCILING.toString, + statesFilterExpectingEmptyListedGroups = List(ConsumerGroupState.STABLE.toString), + version = version + ) + + leaveGroup( + groupId = "grp", + memberId = memberId1, + useNewProtocol = true + ) + leaveGroup( + groupId = "grp", + memberId = memberId2, + useNewProtocol = true + ) + + checkListedGroups( + groupId = "grp", + state = ConsumerGroupState.EMPTY.toString, + statesFilterExpectingEmptyListedGroups = List(ConsumerGroupState.RECONCILING.toString), + version = version + ) + + deleteGroups( + groupIds = List("grp"), + expectedErrors = List(Errors.NONE), + version = ApiKeys.DELETE_GROUPS.latestVersion(isUnstableApiEnabled) + ) + + assertEquals( + List.empty, + listGroups( + statesFilter = List.empty, + version = version.toShort + ) + ) + } + } + + private def testListGroupsWithOldProtocol(): Unit = { + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + // Create the topic. + createTopic( + topic = "foo", + numPartitions = 3 + ) + + for (version <- ApiKeys.LIST_GROUPS.oldestVersion() to ApiKeys.LIST_GROUPS.latestVersion(isUnstableApiEnabled)) { + Review Comment: ditto. ########## core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala: ########## @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.junit.jupiter.api.Assertions.fail +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testOffsetDeleteWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testOffsetDelete(true) + } + + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testOffsetDeleteWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testOffsetDelete(false) + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testOffsetDeleteWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testOffsetDelete(false) + } + + private def testOffsetDelete(useNewProtocol: Boolean): Unit = { + if (useNewProtocol && !isNewGroupCoordinatorEnabled) { + fail("Cannot use the new protocol with the old group coordinator.") + } + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + // Create the topic. + createTopic( + topic = "foo", + numPartitions = 3 + ) + + for (version <- ApiKeys.OFFSET_DELETE.oldestVersion() to ApiKeys.OFFSET_DELETE.latestVersion(isUnstableApiEnabled)) { + Review Comment: nit: empty line. ########## core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala: ########## @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.junit.jupiter.api.Assertions.fail +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testOffsetDeleteWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testOffsetDelete(true) + } + + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testOffsetDeleteWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testOffsetDelete(false) + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testOffsetDeleteWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testOffsetDelete(false) + } + + private def testOffsetDelete(useNewProtocol: Boolean): Unit = { + if (useNewProtocol && !isNewGroupCoordinatorEnabled) { + fail("Cannot use the new protocol with the old group coordinator.") + } + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + // Create the topic. + createTopic( + topic = "foo", + numPartitions = 3 + ) + + for (version <- ApiKeys.OFFSET_DELETE.oldestVersion() to ApiKeys.OFFSET_DELETE.latestVersion(isUnstableApiEnabled)) { + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId, memberEpoch) = joinConsumerGroup( + groupId = "grp", + useNewProtocol = useNewProtocol + ) + + // Commit offsets. + for (partitionId <- 0 to 2) { + commitOffset( + groupId = "grp", + memberId = memberId, + memberEpoch = memberEpoch, + topic = "foo", + partition = partitionId, + offset = 100L + partitionId, + expectedError = Errors.NONE, + version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled) + ) + } + + // Delete offset with topic that the group is subscribed to. + deleteOffset( + groupId = "grp", + topic = "foo", + partition = 0, + expectedPartitionError = Errors.GROUP_SUBSCRIBED_TO_TOPIC, + version = version.toShort + ) + + // Unsubscribe the topic. + if (useNewProtocol) { + consumerGroupHeartbeat( + groupId = "grp", + memberId = memberId, + memberEpoch = memberEpoch, + subscribedTopicNames = List.empty + ) + } else { + leaveGroup( + groupId = "grp", + memberId = memberId, + useNewProtocol = false + ) + } + + // Delete offsets. + for (partitionId <- 0 to 1) { + deleteOffset( + groupId = "grp", + topic = "foo", + partition = partitionId, + version = version.toShort + ) + } + + // Delete offsets with partition that doesn't exist. + deleteOffset( + groupId = "grp", + topic = "foo", + partition = 5, + expectedPartitionError = Errors.UNKNOWN_TOPIC_OR_PARTITION, + version = version.toShort + ) + + // Delete offset with unknown group id. + deleteOffset( Review Comment: Could we also add the case that led to https://github.com/apache/kafka/pull/14589? Could we also update the OffsetFetch integration tests while we are here? ########## core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala: ########## @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.message.DescribeGroupsResponseData.{DescribedGroup, DescribedGroupMember} +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +import scala.jdk.CollectionConverters._ + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDescribeGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testDescribeGroups() + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDescribeGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testDescribeGroups() + } + + private def testDescribeGroups(): Unit = { + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + // Create the topic. + createTopic( + topic = "foo", + numPartitions = 3 + ) + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId1, _) = joinConsumerGroupWithOldProtocol( + groupId = "grp-1", + metadata = Array(1, 2, 3) + ) + val (memberId2, _) = joinConsumerGroupWithOldProtocol( + groupId = "grp-2", + metadata = Array(1, 2, 3), + completeRebalance = false + ) + + for (version <- ApiKeys.DESCRIBE_GROUPS.oldestVersion() to ApiKeys.DESCRIBE_GROUPS.latestVersion(isUnstableApiEnabled)) { + assertEquals( + List( + new DescribedGroup() + .setGroupId("grp-1") + .setGroupState(GenericGroupState.STABLE.toString) + .setProtocolType("consumer") + .setProtocolData("consumer-range") + .setMembers(List( + new DescribedGroupMember() + .setMemberId(memberId1) + .setGroupInstanceId(null) + .setClientId("client-id") + .setClientHost("/127.0.0.1") + .setMemberMetadata(Array(1, 2, 3)) Review Comment: Could we also verify memberAssignment? ########## core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala: ########## @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDeleteGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testDeleteGroups(true) + } + + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDeleteGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testDeleteGroups(false) + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDeleteGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testDeleteGroups(false) + } + + private def testDeleteGroups(useNewProtocol: Boolean): Unit = { + if (useNewProtocol && !isNewGroupCoordinatorEnabled) { + fail("Cannot use the new protocol with the old group coordinator.") + } + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + // Create the topic. + createTopic( + topic = "foo", + numPartitions = 3 + ) + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + // We test DeleteGroups on empty and non-empty groups. Here we create the non-empty group. + joinConsumerGroup( + groupId = "grp-non-empty", + useNewProtocol = useNewProtocol + ) + + for (version <- ApiKeys.DELETE_GROUPS.oldestVersion() to ApiKeys.DELETE_GROUPS.latestVersion(isUnstableApiEnabled)) { + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId, memberEpoch) = joinConsumerGroup( + groupId = "grp", + useNewProtocol = useNewProtocol + ) + leaveGroup( Review Comment: nit: Let's add an empty line and a comment explaining why we need to leave the group. ########## core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala: ########## @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDeleteGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testDeleteGroups(true) + } + + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDeleteGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testDeleteGroups(false) + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDeleteGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testDeleteGroups(false) + } + + private def testDeleteGroups(useNewProtocol: Boolean): Unit = { + if (useNewProtocol && !isNewGroupCoordinatorEnabled) { + fail("Cannot use the new protocol with the old group coordinator.") + } + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + // Create the topic. + createTopic( + topic = "foo", + numPartitions = 3 + ) + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + // We test DeleteGroups on empty and non-empty groups. Here we create the non-empty group. + joinConsumerGroup( + groupId = "grp-non-empty", + useNewProtocol = useNewProtocol + ) + + for (version <- ApiKeys.DELETE_GROUPS.oldestVersion() to ApiKeys.DELETE_GROUPS.latestVersion(isUnstableApiEnabled)) { + Review Comment: nit: We could remove this empty line. ########## core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala: ########## @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.message.DescribeGroupsResponseData.{DescribedGroup, DescribedGroupMember} +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +import scala.jdk.CollectionConverters._ + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDescribeGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testDescribeGroups() + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDescribeGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testDescribeGroups() + } + + private def testDescribeGroups(): Unit = { + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + // Create the topic. + createTopic( + topic = "foo", + numPartitions = 3 + ) + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId1, _) = joinConsumerGroupWithOldProtocol( + groupId = "grp-1", + metadata = Array(1, 2, 3) + ) + val (memberId2, _) = joinConsumerGroupWithOldProtocol( + groupId = "grp-2", + metadata = Array(1, 2, 3), + completeRebalance = false + ) + + for (version <- ApiKeys.DESCRIBE_GROUPS.oldestVersion() to ApiKeys.DESCRIBE_GROUPS.latestVersion(isUnstableApiEnabled)) { + assertEquals( + List( + new DescribedGroup() + .setGroupId("grp-1") + .setGroupState(GenericGroupState.STABLE.toString) + .setProtocolType("consumer") + .setProtocolData("consumer-range") + .setMembers(List( + new DescribedGroupMember() + .setMemberId(memberId1) + .setGroupInstanceId(null) + .setClientId("client-id") + .setClientHost("/127.0.0.1") + .setMemberMetadata(Array(1, 2, 3)) + ).asJava), + new DescribedGroup() + .setGroupId("grp-2") + .setGroupState(GenericGroupState.COMPLETING_REBALANCE.toString) + .setProtocolType("consumer") + .setMembers(List( + new DescribedGroupMember() + .setMemberId(memberId2) + .setGroupInstanceId(null) + .setClientId("client-id") + .setClientHost("/127.0.0.1") Review Comment: Should we assert both member metadata and assignment? ########## core/src/test/scala/unit/kafka/server/DeleteGroupsRequestTest.scala: ########## @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class DeleteGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDeleteGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testDeleteGroups(true) + } + + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDeleteGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testDeleteGroups(false) + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDeleteGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testDeleteGroups(false) + } + + private def testDeleteGroups(useNewProtocol: Boolean): Unit = { + if (useNewProtocol && !isNewGroupCoordinatorEnabled) { + fail("Cannot use the new protocol with the old group coordinator.") + } + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + // Create the topic. + createTopic( + topic = "foo", + numPartitions = 3 + ) + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + // We test DeleteGroups on empty and non-empty groups. Here we create the non-empty group. + joinConsumerGroup( + groupId = "grp-non-empty", + useNewProtocol = useNewProtocol + ) + + for (version <- ApiKeys.DELETE_GROUPS.oldestVersion() to ApiKeys.DELETE_GROUPS.latestVersion(isUnstableApiEnabled)) { + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId, memberEpoch) = joinConsumerGroup( + groupId = "grp", + useNewProtocol = useNewProtocol + ) + leaveGroup( + groupId = "grp", + memberId = memberId, + useNewProtocol = useNewProtocol + ) + + deleteGroups( + groupIds = List("grp-non-empty", "grp"), + expectedErrors = List(Errors.NON_EMPTY_GROUP, Errors.NONE), + version = version.toShort + ) + + if (useNewProtocol) { + commitOffset( + groupId = "grp", + memberId = memberId, + memberEpoch = memberEpoch, + topic = "foo", + partition = 0, + offset = 100L, + expectedError = Errors.GROUP_ID_NOT_FOUND, + version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled) + ) + } else { + assertEquals( + List(new DescribedGroup() + .setGroupId("grp") + .setGroupState(GenericGroupState.DEAD.toString) + ), + describeGroups( + groupIds = List("grp"), + version = version.toShort Review Comment: Using `version.toShort` seems incorrect here, isn't it? ########## core/src/test/scala/unit/kafka/server/DescribeGroupsRequestTest.scala: ########## @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.message.DescribeGroupsResponseData.{DescribedGroup, DescribedGroupMember} +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +import scala.jdk.CollectionConverters._ + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class DescribeGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDescribeGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testDescribeGroups() + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testDescribeGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testDescribeGroups() + } + + private def testDescribeGroups(): Unit = { + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + // Create the topic. + createTopic( + topic = "foo", + numPartitions = 3 + ) + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId1, _) = joinConsumerGroupWithOldProtocol( + groupId = "grp-1", + metadata = Array(1, 2, 3) + ) + val (memberId2, _) = joinConsumerGroupWithOldProtocol( + groupId = "grp-2", + metadata = Array(1, 2, 3), + completeRebalance = false + ) Review Comment: nit: Let's update the first comment and add one for the second member to explain why we don't complete the rebalance. ########## core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala: ########## @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.message.ListGroupsResponseData +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testListGroupsWithNewProtocol() + } + + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testListGroupsWithOldProtocol() + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testListGroupsWithOldProtocol() + } + + private def testListGroupsWithNewProtocol(): Unit = { + if (!isNewGroupCoordinatorEnabled) { + fail("Cannot use the new protocol with the old group coordinator.") + } + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + // Create the topic. + createTopic( + topic = "foo", + numPartitions = 3 + ) + + for (version <- ApiKeys.LIST_GROUPS.oldestVersion() to ApiKeys.LIST_GROUPS.latestVersion(isUnstableApiEnabled)) { + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId1, _) = joinConsumerGroup("grp", true) + + checkListedGroups( + groupId = "grp", + state = ConsumerGroupState.STABLE.toString, + statesFilterExpectingEmptyListedGroups = List(ConsumerGroupState.ASSIGNING.toString), + version = version + ) + + // Member 2 joins the group, triggering a rebalance. + val (memberId2, _) = joinConsumerGroup("grp", true) + + checkListedGroups( + groupId = "grp", + state = ConsumerGroupState.RECONCILING.toString, + statesFilterExpectingEmptyListedGroups = List(ConsumerGroupState.STABLE.toString), + version = version + ) + + leaveGroup( + groupId = "grp", + memberId = memberId1, + useNewProtocol = true + ) + leaveGroup( + groupId = "grp", + memberId = memberId2, + useNewProtocol = true + ) + + checkListedGroups( + groupId = "grp", + state = ConsumerGroupState.EMPTY.toString, + statesFilterExpectingEmptyListedGroups = List(ConsumerGroupState.RECONCILING.toString), + version = version + ) + + deleteGroups( + groupIds = List("grp"), + expectedErrors = List(Errors.NONE), + version = ApiKeys.DELETE_GROUPS.latestVersion(isUnstableApiEnabled) + ) + + assertEquals( + List.empty, + listGroups( + statesFilter = List.empty, + version = version.toShort + ) + ) + } + } Review Comment: I wonder if we should have both old and new group when we test the new group coordinator. What do you think? ########## core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala: ########## @@ -220,31 +292,92 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { joinGroupResponse = connectAndReceive[JoinGroupResponse](joinGroupRequest) assertEquals(Errors.NONE.code, joinGroupResponse.data.errorCode) - val syncGroupRequestData = new SyncGroupRequestData() - .setGroupId(groupId) - .setMemberId(joinGroupResponse.data.memberId) - .setGenerationId(joinGroupResponse.data.generationId) - .setProtocolType("consumer") - .setProtocolName("consumer-range") - .setAssignments(List.empty.asJava) - - // Send the sync group request to complete the rebalance. - val syncGroupRequest = new SyncGroupRequest.Builder(syncGroupRequestData).build() - val syncGroupResponse = connectAndReceive[SyncGroupResponse](syncGroupRequest) - assertEquals(Errors.NONE.code, syncGroupResponse.data.errorCode) + if (completeRebalance) { + // Send the sync group request to complete the rebalance. + syncGroupWithOldProtocol( + groupId = groupId, + memberId = joinGroupResponse.data.memberId(), + generationId = joinGroupResponse.data.generationId() + ) + } (joinGroupResponse.data.memberId, joinGroupResponse.data.generationId) } protected def joinConsumerGroupWithNewProtocol(groupId: String): (String, Int) = { - // Heartbeat request to join the group. + val consumerGroupHeartbeatResponseData = consumerGroupHeartbeat( + groupId = groupId, + rebalanceTimeoutMs = 5 * 60 * 1000, + subscribedTopicNames = List("foo"), + topicPartitions = List.empty + ) + (consumerGroupHeartbeatResponseData.memberId, consumerGroupHeartbeatResponseData.memberEpoch) + } + + protected def joinConsumerGroup(groupId: String, useNewProtocol: Boolean): (String, Int) = { + if (useNewProtocol) { + // Note that we heartbeat only once to join the group and assume + // that the test will complete within the session timeout. + joinConsumerGroupWithNewProtocol(groupId) + } else { + // Note that we don't heartbeat and assume that the test will + // complete within the session timeout. + joinConsumerGroupWithOldProtocol(groupId) + } + } + + protected def listGroups( + statesFilter: List[String], + version: Short + ): List[ListGroupsResponseData.ListedGroup] = { + val request = new ListGroupsRequest.Builder( + new ListGroupsRequestData() + .setStatesFilter(statesFilter.asJava) + ).build(version) + + val response = connectAndReceive[ListGroupsResponse](request) + + response.data().groups.asScala.toList + } + + protected def describeGroups( + groupIds: List[String], + version: Short + ): List[DescribeGroupsResponseData.DescribedGroup] = { + val describeGroupsRequest = new DescribeGroupsRequest.Builder( + new DescribeGroupsRequestData() + .setGroups(groupIds.asJava) + ).build(version) + + val describeGroupsResponse = connectAndReceive[DescribeGroupsResponse](describeGroupsRequest) + + describeGroupsResponse.data().groups().asScala.toList Review Comment: nit: In Scala, we don't have to put `()` for getters so we usually don't put them in our code base. Here you can use `describeGroupsResponse.data.groups.asScala.toList`. There are other cases where you could remove them. Could you please look at them? ########## core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala: ########## @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.message.ListGroupsResponseData +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testListGroupsWithNewProtocol() + } + + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testListGroupsWithOldProtocol() + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testListGroupsWithOldProtocol() + } + + private def testListGroupsWithNewProtocol(): Unit = { + if (!isNewGroupCoordinatorEnabled) { + fail("Cannot use the new protocol with the old group coordinator.") + } + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + // Create the topic. + createTopic( + topic = "foo", + numPartitions = 3 + ) + + for (version <- ApiKeys.LIST_GROUPS.oldestVersion() to ApiKeys.LIST_GROUPS.latestVersion(isUnstableApiEnabled)) { + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId1, _) = joinConsumerGroup("grp", true) + + checkListedGroups( + groupId = "grp", + state = ConsumerGroupState.STABLE.toString, + statesFilterExpectingEmptyListedGroups = List(ConsumerGroupState.ASSIGNING.toString), + version = version + ) + + // Member 2 joins the group, triggering a rebalance. + val (memberId2, _) = joinConsumerGroup("grp", true) + + checkListedGroups( + groupId = "grp", + state = ConsumerGroupState.RECONCILING.toString, + statesFilterExpectingEmptyListedGroups = List(ConsumerGroupState.STABLE.toString), + version = version + ) + + leaveGroup( + groupId = "grp", + memberId = memberId1, + useNewProtocol = true + ) + leaveGroup( + groupId = "grp", + memberId = memberId2, + useNewProtocol = true + ) + + checkListedGroups( + groupId = "grp", + state = ConsumerGroupState.EMPTY.toString, + statesFilterExpectingEmptyListedGroups = List(ConsumerGroupState.RECONCILING.toString), + version = version + ) + + deleteGroups( + groupIds = List("grp"), + expectedErrors = List(Errors.NONE), + version = ApiKeys.DELETE_GROUPS.latestVersion(isUnstableApiEnabled) + ) + + assertEquals( + List.empty, + listGroups( + statesFilter = List.empty, + version = version.toShort + ) + ) + } + } + + private def testListGroupsWithOldProtocol(): Unit = { + Review Comment: nit: Empty line. ########## core/src/test/scala/unit/kafka/server/OffsetDeleteRequestTest.scala: ########## @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.junit.jupiter.api.Assertions.fail +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class OffsetDeleteRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testOffsetDeleteWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testOffsetDelete(true) + } + + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testOffsetDeleteWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testOffsetDelete(false) + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testOffsetDeleteWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testOffsetDelete(false) + } + + private def testOffsetDelete(useNewProtocol: Boolean): Unit = { + if (useNewProtocol && !isNewGroupCoordinatorEnabled) { + fail("Cannot use the new protocol with the old group coordinator.") + } + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + // Create the topic. + createTopic( + topic = "foo", + numPartitions = 3 + ) + + for (version <- ApiKeys.OFFSET_DELETE.oldestVersion() to ApiKeys.OFFSET_DELETE.latestVersion(isUnstableApiEnabled)) { + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId, memberEpoch) = joinConsumerGroup( + groupId = "grp", + useNewProtocol = useNewProtocol + ) + + // Commit offsets. + for (partitionId <- 0 to 2) { + commitOffset( + groupId = "grp", + memberId = memberId, + memberEpoch = memberEpoch, + topic = "foo", + partition = partitionId, + offset = 100L + partitionId, + expectedError = Errors.NONE, + version = ApiKeys.OFFSET_COMMIT.latestVersion(isUnstableApiEnabled) + ) + } + + // Delete offset with topic that the group is subscribed to. + deleteOffset( + groupId = "grp", + topic = "foo", + partition = 0, + expectedPartitionError = Errors.GROUP_SUBSCRIBED_TO_TOPIC, + version = version.toShort + ) + + // Unsubscribe the topic. + if (useNewProtocol) { + consumerGroupHeartbeat( + groupId = "grp", + memberId = memberId, + memberEpoch = memberEpoch, + subscribedTopicNames = List.empty + ) + } else { + leaveGroup( + groupId = "grp", + memberId = memberId, + useNewProtocol = false + ) Review Comment: I wonder we could also test the "unsubscribe" case for the old protocol and the leave group case for the new one for completeness. ########## core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala: ########## @@ -0,0 +1,236 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import org.apache.kafka.common.message.ListGroupsResponseData +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.coordinator.group.consumer.ConsumerGroup.ConsumerGroupState +import org.apache.kafka.coordinator.group.generic.GenericGroupState +import org.junit.jupiter.api.Assertions.{assertEquals, fail} +import org.junit.jupiter.api.{Tag, Timeout} +import org.junit.jupiter.api.extension.ExtendWith + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) +@Tag("integration") +class ListGroupsRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "group.consumer.max.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "group.consumer.session.timeout.ms", value = "600000"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithNewConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testListGroupsWithNewProtocol() + } + + @ClusterTest(serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithOldConsumerGroupProtocolAndNewGroupCoordinator(): Unit = { + testListGroupsWithOldProtocol() + } + + @ClusterTest(clusterType = Type.ALL, serverProperties = Array( + new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), + new ClusterConfigProperty(key = "group.coordinator.new.enable", value = "false"), + new ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), + new ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1") + )) + def testListGroupsWithOldConsumerGroupProtocolAndOldGroupCoordinator(): Unit = { + testListGroupsWithOldProtocol() + } + + private def testListGroupsWithNewProtocol(): Unit = { + if (!isNewGroupCoordinatorEnabled) { + fail("Cannot use the new protocol with the old group coordinator.") + } + + // Creates the __consumer_offsets topics because it won't be created automatically + // in this test because it does not use FindCoordinator API. + createOffsetsTopic() + + // Create the topic. + createTopic( + topic = "foo", + numPartitions = 3 + ) + + for (version <- ApiKeys.LIST_GROUPS.oldestVersion() to ApiKeys.LIST_GROUPS.latestVersion(isUnstableApiEnabled)) { + + // Join the consumer group. Note that we don't heartbeat here so we must use + // a session long enough for the duration of the test. + val (memberId1, _) = joinConsumerGroup("grp", true) + + checkListedGroups( + groupId = "grp", + state = ConsumerGroupState.STABLE.toString, + statesFilterExpectingEmptyListedGroups = List(ConsumerGroupState.ASSIGNING.toString), + version = version + ) + + // Member 2 joins the group, triggering a rebalance. + val (memberId2, _) = joinConsumerGroup("grp", true) + + checkListedGroups( + groupId = "grp", + state = ConsumerGroupState.RECONCILING.toString, + statesFilterExpectingEmptyListedGroups = List(ConsumerGroupState.STABLE.toString), + version = version + ) + + leaveGroup( + groupId = "grp", + memberId = memberId1, + useNewProtocol = true + ) + leaveGroup( + groupId = "grp", + memberId = memberId2, + useNewProtocol = true + ) + + checkListedGroups( + groupId = "grp", + state = ConsumerGroupState.EMPTY.toString, + statesFilterExpectingEmptyListedGroups = List(ConsumerGroupState.RECONCILING.toString), + version = version + ) + + deleteGroups( + groupIds = List("grp"), + expectedErrors = List(Errors.NONE), + version = ApiKeys.DELETE_GROUPS.latestVersion(isUnstableApiEnabled) + ) + + assertEquals( + List.empty, + listGroups( + statesFilter = List.empty, Review Comment: Should we also have a test with multiple states? It seems that we only do empty or one. ########## core/src/test/scala/unit/kafka/server/GroupCoordinatorBaseRequestTest.scala: ########## @@ -181,6 +184,52 @@ class GroupCoordinatorBaseRequestTest(cluster: ClusterInstance) { response.data.groups.asScala.toList } + protected def deleteOffset( + groupId: String, + topic: String, + partition: Int, + expectedResponseError: Errors = Errors.NONE, + expectedPartitionError: Errors = Errors.NONE, + version: Short + ): Unit = { + if (expectedResponseError != Errors.NONE && expectedPartitionError != Errors.NONE) { + fail("deleteOffset: neither expectedResponseError nor expectedTopicError is Errors.NONE.") + } + + val request = new OffsetDeleteRequest.Builder( + new OffsetDeleteRequestData() + .setGroupId(groupId) + .setTopics(new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(List( + new OffsetDeleteRequestData.OffsetDeleteRequestTopic() + .setName(topic) + .setPartitions(List( + new OffsetDeleteRequestData.OffsetDeleteRequestPartition() + .setPartitionIndex(partition) + ).asJava) + ).asJava.iterator())) + ).build(version) + + val expectedResponse = new OffsetDeleteResponseData() + if (expectedResponseError == Errors.NONE) { + expectedResponse + .setTopics(new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(List( + new OffsetDeleteResponseData.OffsetDeleteResponseTopic() + .setName(topic) + .setPartitions(new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(List( + new OffsetDeleteResponseData.OffsetDeleteResponsePartition() + .setPartitionIndex(partition) + .setErrorCode(expectedPartitionError.code()) + ).asJava.iterator())) + ).asJava.iterator())) + } else { + expectedResponse + .setErrorCode(expectedResponseError.code()) Review Comment: nit: We could bring back `setErrorCode` on the previous line as there is space for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org