This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fc21a1553d2 KAFKA-19027 Replace
ConsumerGroupCommandTestUtils#generator by ClusterTestDefaults (#19347)
fc21a1553d2 is described below
commit fc21a1553d248f8838695a527341b4c0be1778e5
Author: Nick Guo <[email protected]>
AuthorDate: Mon Apr 14 09:05:24 2025 -0400
KAFKA-19027 Replace ConsumerGroupCommandTestUtils#generator by
ClusterTestDefaults (#19347)
jira: https://issues.apache.org/jira/browse/KAFKA-19027
[KAFKA-18329](https://issues.apache.org/jira/browse/KAFKA-18329) will
remove old coordinator, so `ConsumerGroupCommandTestUtils#generator`
creates only one config now. Hence, we should use ClusterTestDefaults to
write more readable code for tests.
Reviewers: Ken Huang <[email protected]>, PoAn Yang
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../group/ConsumerGroupCommandTestUtils.java | 49 +-----------
.../consumer/group/DeleteConsumerGroupsTest.java | 37 +++++----
...OffsetsConsumerGroupCommandIntegrationTest.java | 44 +++++++----
.../consumer/group/DescribeConsumerGroupTest.java | 89 +++++++++++++---------
.../consumer/group/ListConsumerGroupTest.java | 42 ++++++----
.../group/ResetConsumerGroupOffsetTest.java | 74 ++++++++++--------
6 files changed, 173 insertions(+), 162 deletions(-)
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
index 075bbc3a734..bf67bcf0d87 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java
@@ -20,16 +20,12 @@ package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.test.api.ClusterConfig;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.server.common.Feature;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -38,58 +34,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
-import static org.apache.kafka.common.test.api.Type.CO_KRAFT;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
-import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
/**
- * The old test framework {@link
kafka.api.BaseConsumerTest#getTestGroupProtocolParametersAll} test for the
following cases:
- * <ul>
- * <li>(KRAFT server) with (group.coordinator.new.enable=true) with
(classic group protocol) = 1 case</li>
- * <li>(KRAFT server) with (group.coordinator.new.enable=true) with
(consumer group protocol) = 1 case</li>
- * </ul>
- * <p>
- * The new test framework run seven cases for the following cases:
- * <ul>
- * <li>(KRAFT / CO_KRAFT servers) with
(group.coordinator.new.enable=false) with (classic group protocol) = 2
cases</li>
- * <li>(KRAFT / CO_KRAFT servers) with (group.coordinator.new.enable=true)
with (classic group protocol) = 2 cases</li>
- * <li>(KRAFT / CO_KRAFT servers) with (group.coordinator.new.enable=true)
with (consumer group protocol) = 2 cases</li>
- * </ul>
- * <p>
- * We can reduce the number of cases as same as the old test framework by
using the following methods:
- * <ul>
- * <li>(CO_KRAFT servers) with (group.coordinator.new.enable=true) with
(classic / consumer group protocols) = 2 cases</li>
- * </ul>
- * <ul>
- * <li>(KRAFT server) with (group.coordinator.new.enable=false) with
(classic group protocol) = 1 case</li>
- * </ul>
+ * This class provides methods to build and manage consumer instances.
*/
class ConsumerGroupCommandTestUtils {
private ConsumerGroupCommandTestUtils() {
}
- static List<ClusterConfig> generator() {
- Map<String, String> serverProperties = new HashMap<>();
- serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1");
- serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
- serverProperties.put(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "1000");
- serverProperties.put(CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG,
"500");
- serverProperties.put(CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
"500");
-
- return Collections.singletonList(ClusterConfig.defaultBuilder()
- .setTypes(Collections.singleton(CO_KRAFT))
- .setServerProperties(serverProperties)
- .setTags(Collections.singletonList("kraftGroupCoordinator"))
- .setFeatures(Utils.mkMap(
- Utils.mkEntry(Feature.TRANSACTION_VERSION, (short) 2),
- Utils.mkEntry(Feature.GROUP_VERSION, (short) 1)))
- .build());
- }
-
static <T> AutoCloseable buildConsumers(int numberOfConsumers,
Set<TopicPartition> partitions,
Supplier<KafkaConsumer<T, T>>
consumerSupplier) {
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
index 7d552dcbe48..4eda3a1af15 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java
@@ -26,8 +26,10 @@ import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.api.ClusterConfig;
-import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.ToolsTestUtils;
@@ -36,7 +38,6 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -56,6 +57,9 @@ import static
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNM
import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.common.GroupState.EMPTY;
import static org.apache.kafka.common.GroupState.STABLE;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -64,11 +68,15 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class DeleteConsumerGroupsTest {
-
- private static List<ClusterConfig> generator() {
- return ConsumerGroupCommandTestUtils.generator();
+@ClusterTestDefaults(
+ types = {Type.CO_KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
value = "1"),
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
}
+)
+public class DeleteConsumerGroupsTest {
@Test
public void testDeleteWithTopicOption() {
@@ -76,7 +84,8 @@ public class DeleteConsumerGroupsTest {
assertThrows(OptionException.class, () ->
ConsumerGroupCommandOptions.fromArgs(cgcArgs));
}
- @ClusterTemplate("generator")
+
+ @ClusterTest
public void testDeleteCmdNonExistingGroup(ClusterInstance cluster) {
String missingGroupId = getDummyGroupId();
String[] cgcArgs = new String[]{"--bootstrap-server",
cluster.bootstrapServers(), "--delete", "--group", missingGroupId};
@@ -87,7 +96,7 @@ public class DeleteConsumerGroupsTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDeleteNonExistingGroup(ClusterInstance cluster) {
String missingGroupId = getDummyGroupId();
String[] cgcArgs = new String[]{"--bootstrap-server",
cluster.bootstrapServers(), "--delete", "--group", missingGroupId};
@@ -101,7 +110,7 @@ public class DeleteConsumerGroupsTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDeleteNonEmptyGroup(ClusterInstance cluster) throws
Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String groupId = composeGroupId(groupProtocol);
@@ -134,7 +143,7 @@ public class DeleteConsumerGroupsTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
void testDeleteEmptyGroup(ClusterInstance cluster) throws Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String groupId = composeGroupId(groupProtocol);
@@ -168,7 +177,7 @@ public class DeleteConsumerGroupsTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDeleteCmdAllGroups(ClusterInstance cluster) throws
Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String topicName = composeTopicName(groupProtocol);
@@ -206,7 +215,7 @@ public class DeleteConsumerGroupsTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDeleteCmdWithMixOfSuccessAndError(ClusterInstance cluster)
throws Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String groupId = composeGroupId(groupProtocol);
@@ -239,7 +248,7 @@ public class DeleteConsumerGroupsTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDeleteWithMixOfSuccessAndError(ClusterInstance cluster)
throws Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String groupId = composeGroupId(groupProtocol);
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java
index 133c9bfb53a..d273c1fa529 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java
@@ -32,8 +32,10 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.api.ClusterConfig;
-import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.junit.jupiter.api.Assertions;
@@ -41,15 +43,29 @@ import org.junit.jupiter.api.Assertions;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
+@ClusterTestDefaults(
+ types = {Type.CO_KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value =
"1"),
+ @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
value = "1"),
+ @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
value = "1000"),
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
+ }
+)
public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
public static final String TOPIC_PREFIX = "foo.";
public static final String GROUP_PREFIX = "test.group.";
@@ -59,11 +75,7 @@ public class
DeleteOffsetsConsumerGroupCommandIntegrationTest {
this.clusterInstance = clusterInstance;
}
- private static List<ClusterConfig> generator() {
- return ConsumerGroupCommandTestUtils.generator();
- }
-
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDeleteOffsetsNonExistingGroup() {
String group = "missing.group";
String topic = "foo:1";
@@ -73,7 +85,7 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest
{
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() {
for (GroupProtocol groupProtocol :
clusterInstance.supportedGroupProtocols()) {
String topic = TOPIC_PREFIX + groupProtocol.name();
@@ -85,7 +97,7 @@ public class DeleteOffsetsConsumerGroupCommandIntegrationTest
{
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly() {
for (GroupProtocol groupProtocol :
clusterInstance.supportedGroupProtocols()) {
String topic = TOPIC_PREFIX + groupProtocol.name();
@@ -97,7 +109,7 @@ public class
DeleteOffsetsConsumerGroupCommandIntegrationTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition() {
for (GroupProtocol groupProtocol :
clusterInstance.supportedGroupProtocols()) {
String topic = TOPIC_PREFIX + groupProtocol.name();
@@ -107,7 +119,7 @@ public class
DeleteOffsetsConsumerGroupCommandIntegrationTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly() {
for (GroupProtocol groupProtocol :
clusterInstance.supportedGroupProtocols()) {
String topic = TOPIC_PREFIX + groupProtocol.name();
@@ -117,7 +129,7 @@ public class
DeleteOffsetsConsumerGroupCommandIntegrationTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition() {
for (GroupProtocol groupProtocol :
clusterInstance.supportedGroupProtocols()) {
String topic = TOPIC_PREFIX + groupProtocol.name();
@@ -129,7 +141,7 @@ public class
DeleteOffsetsConsumerGroupCommandIntegrationTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly() {
for (GroupProtocol groupProtocol :
clusterInstance.supportedGroupProtocols()) {
String topic = TOPIC_PREFIX + groupProtocol.name();
@@ -141,7 +153,7 @@ public class
DeleteOffsetsConsumerGroupCommandIntegrationTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition() {
for (GroupProtocol groupProtocol :
clusterInstance.supportedGroupProtocols()) {
String topic = TOPIC_PREFIX + groupProtocol.name();
@@ -151,7 +163,7 @@ public class
DeleteOffsetsConsumerGroupCommandIntegrationTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicOnly() {
for (GroupProtocol groupProtocol :
clusterInstance.supportedGroupProtocols()) {
String topic = TOPIC_PREFIX + groupProtocol.name();
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
index b30c3674514..43c248cc9b6 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java
@@ -36,8 +36,10 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.api.ClusterConfig;
-import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.test.TestUtils;
@@ -66,6 +68,11 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.test.TestUtils.RANDOM;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -73,6 +80,16 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+@ClusterTestDefaults(
+ types = {Type.CO_KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value =
"1"),
+ @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
value = "1"),
+ @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
value = "1000"),
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
+ }
+)
public class DescribeConsumerGroupTest {
private static final String TOPIC_PREFIX = "test.topic.";
private static final String GROUP_PREFIX = "test.group.";
@@ -82,11 +99,7 @@ public class DescribeConsumerGroupTest {
private static final List<List<String>> DESCRIBE_TYPES =
Stream.of(DESCRIBE_TYPE_OFFSETS, DESCRIBE_TYPE_MEMBERS,
DESCRIBE_TYPE_STATE).flatMap(Collection::stream).toList();
private ClusterInstance clusterInstance;
- private static List<ClusterConfig> generator() {
- return ConsumerGroupCommandTestUtils.generator();
- }
-
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDescribeNonExistingGroup(ClusterInstance clusterInstance) {
String missingGroup = "missing.group";
@@ -106,7 +119,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDescribeOffsetsOfNonExistingGroup(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
String missingGroup = "missing.group";
@@ -129,7 +142,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDescribeMembersOfNonExistingGroup(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
String missingGroup = "missing.group";
@@ -150,7 +163,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDescribeStateOfNonExistingGroup(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
String missingGroup = "missing.group";
@@ -171,7 +184,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDescribeGroupOffsets(ClusterInstance clusterInstance)
throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -212,7 +225,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDescribeGroupMembers(ClusterInstance clusterInstance)
throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -258,7 +271,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDescribeGroupMemberWithMigration(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
String topic = TOPIC_PREFIX + "migration";
@@ -309,7 +322,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDescribeGroupState(ClusterInstance clusterInstance) throws
Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -351,7 +364,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDescribeExistingGroups(ClusterInstance clusterInstance)
throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -392,7 +405,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDescribeAllExistingGroups(ClusterInstance clusterInstance)
throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -433,7 +446,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDescribeOffsetsOfExistingGroup(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -473,7 +486,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDescribeMembersOfExistingGroup(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -500,7 +513,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDescribeStateOfExistingGroup(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -523,7 +536,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testDescribeStateOfExistingGroupWithNonDefaultAssignor(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -561,7 +574,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDescribeExistingGroupWithNoMembers(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -592,7 +605,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testDescribeOffsetsOfExistingGroupWithNoMembers(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -629,7 +642,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testDescribeMembersOfExistingGroupWithNoMembers(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -658,7 +671,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDescribeStateOfExistingGroupWithNoMembers(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -689,7 +702,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testDescribeWithConsumersWithoutAssignedPartitions(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -716,7 +729,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testDescribeOffsetsWithConsumersWithoutAssignedPartitions(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -739,7 +752,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testDescribeMembersWithConsumersWithoutAssignedPartitions(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -769,7 +782,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testDescribeStateWithConsumersWithoutAssignedPartitions(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -789,7 +802,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testDescribeWithMultiPartitionTopicAndMultipleConsumers(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -816,7 +829,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testDescribeOffsetsWithMultiPartitionTopicAndMultipleConsumers(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -840,7 +853,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testDescribeMembersWithMultiPartitionTopicAndMultipleConsumers(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -868,7 +881,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testDescribeStateWithMultiPartitionTopicAndMultipleConsumers(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -888,7 +901,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDescribeSimpleConsumerGroup(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
// Ensure that the offsets of consumers which don't use group
management are still displayed
@@ -909,7 +922,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testDescribeGroupWithShortInitializationTimeout(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -935,7 +948,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testDescribeGroupOffsetsWithShortInitializationTimeout(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -957,7 +970,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testDescribeGroupMembersWithShortInitializationTimeout(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -981,7 +994,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testDescribeGroupStateWithShortInitializationTimeout(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
@@ -1003,7 +1016,7 @@ public class DescribeConsumerGroupTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testDescribeNonOffsetCommitGroup(ClusterInstance
clusterInstance) throws Exception {
this.clusterInstance = clusterInstance;
for (GroupProtocol groupProtocol:
clusterInstance.supportedGroupProtocols()) {
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
index 16db6fb93bc..434e669ac20 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
@@ -29,8 +29,10 @@ import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.api.ClusterConfig;
-import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.ToolsTestUtils;
@@ -61,8 +63,22 @@ import static
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CO
import static
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-
-
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+
+@ClusterTestDefaults(
+ types = {Type.CO_KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value =
"1"),
+ @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
value = "1"),
+ @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
value = "1000"),
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
+ }
+)
public class ListConsumerGroupTest {
private static final String TOPIC_PREFIX = "test.topic.";
private static final String TOPIC_PARTITIONS_GROUP_PREFIX =
"test.topic.partitions.group.";
@@ -74,15 +90,11 @@ public class ListConsumerGroupTest {
this.clusterInstance = clusterInstance;
}
- private static List<ClusterConfig> defaultGenerator() {
- return ConsumerGroupCommandTestUtils.generator();
- }
-
private List<GroupProtocol> supportedGroupProtocols() {
return new ArrayList<>(clusterInstance.supportedGroupProtocols());
}
- @ClusterTemplate("defaultGenerator")
+ @ClusterTest
public void testListConsumerGroupsWithoutFilters() throws Exception {
for (int i = 0; i < supportedGroupProtocols().size(); i++) {
GroupProtocol groupProtocol = supportedGroupProtocols().get(i);
@@ -111,13 +123,13 @@ public class ListConsumerGroupTest {
}
}
- @ClusterTemplate("defaultGenerator")
+ @ClusterTest
public void testListWithUnrecognizedNewConsumerOption() {
String[] cgcArgs = new String[]{"--new-consumer",
"--bootstrap-server", clusterInstance.bootstrapServers(), "--list"};
Assertions.assertThrows(OptionException.class, () ->
getConsumerGroupService(cgcArgs));
}
- @ClusterTemplate("defaultGenerator")
+ @ClusterTest
public void testListConsumerGroupsWithStates() throws Exception {
for (int i = 0; i < supportedGroupProtocols().size(); i++) {
GroupProtocol groupProtocol = supportedGroupProtocols().get(i);
@@ -181,7 +193,7 @@ public class ListConsumerGroupTest {
}
}
- @ClusterTemplate("defaultGenerator")
+ @ClusterTest
public void testListConsumerGroupsWithTypesClassicProtocol() throws
Exception {
GroupProtocol groupProtocol = GroupProtocol.CLASSIC;
String topic = TOPIC_PREFIX + groupProtocol.name;
@@ -235,7 +247,7 @@ public class ListConsumerGroupTest {
}
}
- @ClusterTemplate("defaultGenerator")
+ @ClusterTest
public void testListConsumerGroupsWithTypesConsumerProtocol() throws
Exception {
GroupProtocol groupProtocol = GroupProtocol.CONSUMER;
String topic = TOPIC_PREFIX + groupProtocol.name;
@@ -322,7 +334,7 @@ public class ListConsumerGroupTest {
}
}
- @ClusterTemplate("defaultGenerator")
+ @ClusterTest
public void testListGroupCommandClassicProtocol() throws Exception {
GroupProtocol groupProtocol = GroupProtocol.CLASSIC;
String topic = TOPIC_PREFIX + groupProtocol.name;
@@ -407,7 +419,7 @@ public class ListConsumerGroupTest {
}
}
- @ClusterTemplate("defaultGenerator")
+ @ClusterTest
public void testListGroupCommandConsumerProtocol() throws Exception {
GroupProtocol groupProtocol = GroupProtocol.CONSUMER;
String topic = TOPIC_PREFIX + groupProtocol.name;
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
index bcc06301d63..3c417cdeeac 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ResetConsumerGroupOffsetTest.java
@@ -30,8 +30,10 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.test.ClusterInstance;
-import org.apache.kafka.common.test.api.ClusterConfig;
-import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.test.TestUtils;
import java.io.BufferedWriter;
@@ -73,7 +75,11 @@ import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZE
import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -89,15 +95,21 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
* - scope=topics+partitions, scenario=to-earliest
* - export/import
*/
+@ClusterTestDefaults(
+ types = {Type.CO_KRAFT},
+ serverProperties = {
+ @ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value =
"1"),
+ @ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
value = "1"),
+ @ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
value = "1000"),
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
+ @ClusterConfigProperty(key =
CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, value = "500"),
+ }
+)
public class ResetConsumerGroupOffsetTest {
private static final String TOPIC_PREFIX = "foo-";
private static final String GROUP_PREFIX = "test.group-";
- private static List<ClusterConfig> generator() {
- return ConsumerGroupCommandTestUtils.generator();
- }
-
private String[] basicArgs(ClusterInstance cluster) {
return new String[]{"--reset-offsets",
"--bootstrap-server", cluster.bootstrapServers(),
@@ -125,7 +137,7 @@ public class ResetConsumerGroupOffsetTest {
return res.toArray(new String[0]);
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetOffsetsNotExistingGroup(ClusterInstance cluster)
throws Exception {
String topic = generateRandomTopic();
String group = "missing.group";
@@ -138,7 +150,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetOffsetsExistingTopic(ClusterInstance cluster) {
String topic = generateRandomTopic();
String group = "new.group";
@@ -152,7 +164,7 @@ public class ResetConsumerGroupOffsetTest {
50, false, singletonList(topic));
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetOffsetsExistingTopicSelectedGroups(ClusterInstance
cluster) throws Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String topic = generateRandomTopic();
@@ -175,7 +187,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetOffsetsExistingTopicAllGroups(ClusterInstance
cluster) throws Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String topic = generateRandomTopic();
@@ -197,7 +209,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetOffsetsAllTopicsAllGroups(ClusterInstance cluster)
throws Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String groupId = generateRandomGroupId();
@@ -229,7 +241,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetOffsetsToLocalDateTime(ClusterInstance cluster)
throws Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String group = generateRandomGroupId();
@@ -252,7 +264,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetOffsetsToZonedDateTime(ClusterInstance cluster)
throws Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String group = generateRandomGroupId();
@@ -276,7 +288,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetOffsetsByDuration(ClusterInstance cluster) throws
Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String group = generateRandomGroupId();
@@ -288,7 +300,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetOffsetsByDurationToEarliest(ClusterInstance cluster)
throws Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String group = generateRandomGroupId();
@@ -300,7 +312,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testResetOffsetsByDurationFallbackToLatestWhenNoRecords(ClusterInstance
cluster) throws ExecutionException, InterruptedException {
String group = generateRandomGroupId();
String topic = generateRandomTopic();
@@ -314,7 +326,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetOffsetsToEarliest(ClusterInstance cluster) throws
Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String group = generateRandomGroupId();
@@ -326,7 +338,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetOffsetsToLatest(ClusterInstance cluster) throws
Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String group = generateRandomGroupId();
@@ -339,7 +351,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetOffsetsToCurrentOffset(ClusterInstance cluster)
throws Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String group = generateRandomGroupId();
@@ -352,7 +364,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetOffsetsToSpecificOffset(ClusterInstance cluster)
throws Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String group = generateRandomGroupId();
@@ -364,7 +376,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetOffsetsShiftPlus(ClusterInstance cluster) throws
Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String group = generateRandomGroupId();
@@ -377,7 +389,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetOffsetsShiftMinus(ClusterInstance cluster) throws
Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String group = generateRandomGroupId();
@@ -390,7 +402,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetOffsetsShiftByLowerThanEarliest(ClusterInstance
cluster) throws Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String group = generateRandomGroupId();
@@ -403,7 +415,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetOffsetsShiftByHigherThanLatest(ClusterInstance
cluster) throws Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String group = generateRandomGroupId();
@@ -416,7 +428,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetOffsetsToEarliestOnOneTopic(ClusterInstance cluster)
throws Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String group = generateRandomGroupId();
@@ -428,7 +440,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testResetOffsetsToEarliestOnOneTopicAndPartition(ClusterInstance cluster)
throws Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String group = generateRandomGroupId();
@@ -454,7 +466,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetOffsetsToEarliestOnTopics(ClusterInstance cluster)
throws Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String group = generateRandomGroupId();
@@ -489,7 +501,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void
testResetOffsetsToEarliestOnTopicsAndPartitions(ClusterInstance cluster) throws
Exception {
for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
String group = generateRandomGroupId();
@@ -530,7 +542,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
// This one deals with old CSV export/import format for a single --group
arg:
// "topic,partition,offset" to support old behavior
public void testResetOffsetsExportImportPlanSingleGroupArg(ClusterInstance
cluster) throws Exception {
@@ -570,7 +582,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
// This one deals with universal CSV export/import file format
"group,topic,partition,offset",
// supporting multiple --group args or --all-groups arg
public void testResetOffsetsExportImportPlan(ClusterInstance cluster)
throws Exception {
@@ -637,7 +649,7 @@ public class ResetConsumerGroupOffsetTest {
}
}
- @ClusterTemplate("generator")
+ @ClusterTest
public void testResetWithUnrecognizedNewConsumerOption(ClusterInstance
cluster) {
String group = generateRandomGroupId();
String[] cgcArgs = new String[]{"--new-consumer",