This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 01afba8fdbd MINOR: Refactor ShareConsumerTest to use 
ClusterTestExtensions. (#18656)
01afba8fdbd is described below

commit 01afba8fdbd2d24adbc6ab64bca361ed53f870ac
Author: Sushant Mahajan <[email protected]>
AuthorDate: Thu Jan 23 22:05:33 2025 +0530

    MINOR: Refactor ShareConsumerTest to use ClusterTestExtensions. (#18656)
    
    Reviewers: ShivsundarR <[email protected]>, Apoorv Mittal 
<[email protected]>, Andrew Schofield <[email protected]>
---
 .../java/kafka/test/api/ShareConsumerTest.java     | 582 +++++++++++----------
 .../kafka/common/test/api/ClusterInstance.java     |  16 +
 2 files changed, 336 insertions(+), 262 deletions(-)

diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java 
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index 237fe34bbe7..c7e62d0eb70 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -26,12 +26,12 @@ import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.RecordsToDelete;
 import org.apache.kafka.clients.consumer.AcknowledgeType;
 import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.KafkaShareConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.consumer.ShareConsumer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -48,22 +48,21 @@ import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.test.KafkaClusterTestKit;
-import org.apache.kafka.common.test.TestKitNodes;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.ClusterTestExtensions;
 import org.apache.kafka.common.test.api.Flaky;
+import org.apache.kafka.common.test.api.Type;
 import org.apache.kafka.coordinator.group.GroupConfig;
 import org.apache.kafka.test.TestUtils;
 
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -101,67 +100,64 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @Timeout(1200)
 @Tag("integration")
+@ExtendWith(ClusterTestExtensions.class)
+@ClusterTestDefaults(
+    serverProperties = {
+        @ClusterConfigProperty(key = "auto.create.topics.enable", value = 
"false"),
+        @ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", 
value = "classic,consumer,share"),
+        @ClusterConfigProperty(key = "group.share.enable", value = "true"),
+        @ClusterConfigProperty(key = "group.share.partition.max.record.locks", 
value = "10000"),
+        @ClusterConfigProperty(key = "group.share.record.lock.duration.ms", 
value = "15000"),
+        @ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+        @ClusterConfigProperty(key = "share.coordinator.state.topic.min.isr", 
value = "1"),
+        @ClusterConfigProperty(key = 
"share.coordinator.state.topic.num.partitions", value = "3"),
+        @ClusterConfigProperty(key = 
"share.coordinator.state.topic.replication.factor", value = "1"),
+        @ClusterConfigProperty(key = "transaction.state.log.min.isr", value = 
"1"),
+        @ClusterConfigProperty(key = 
"transaction.state.log.replication.factor", value = "1"),
+        @ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"true")
+    },
+    types = {Type.KRAFT}
+)
 public class ShareConsumerTest {
-    private KafkaClusterTestKit cluster;
+    private final ClusterInstance cluster;
     private final TopicPartition tp = new TopicPartition("topic", 0);
     private final TopicPartition tp2 = new TopicPartition("topic2", 0);
     private final TopicPartition warmupTp = new TopicPartition("warmup", 0);
     private List<TopicPartition> sgsTopicPartitions;
 
-    private Admin adminClient;
-
-    @BeforeEach
-    public void createCluster(TestInfo testInfo) throws Exception {
-        cluster = new KafkaClusterTestKit.Builder(
-            new TestKitNodes.Builder()
-                .setNumBrokerNodes(1)
-                .setNumControllerNodes(1)
-                .build())
-            .setConfigProp("auto.create.topics.enable", "false")
-            .setConfigProp("group.coordinator.rebalance.protocols", 
"classic,consumer,share")
-            .setConfigProp("group.share.enable", "true")
-            .setConfigProp("group.share.partition.max.record.locks", "10000")
-            .setConfigProp("group.share.record.lock.duration.ms", "15000")
-            .setConfigProp("offsets.topic.replication.factor", "1")
-            .setConfigProp("share.coordinator.state.topic.min.isr", "1")
-            .setConfigProp("share.coordinator.state.topic.num.partitions", "3")
-            .setConfigProp("share.coordinator.state.topic.replication.factor", 
"1")
-            .setConfigProp("transaction.state.log.min.isr", "1")
-            .setConfigProp("transaction.state.log.replication.factor", "1")
-            .setConfigProp("unstable.api.versions.enable", "true")
-            .build();
-        cluster.format();
-        cluster.startup();
-        cluster.waitForActiveController();
-        cluster.waitForReadyBrokers();
-        createTopic("topic");
-        createTopic("topic2");
-        adminClient = createAdminClient();
-        sgsTopicPartitions = IntStream.range(0, 3)
-            .mapToObj(part -> new 
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, part))
-            .toList();
-        warmup();
-    }
-
-    @AfterEach
-    public void destroyCluster() throws Exception {
-        adminClient.close();
-        cluster.close();
-    }
-
-    @Test
+    public ShareConsumerTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    private void setup() {
+        try {
+            this.cluster.waitForReadyBrokers();
+            createTopic("topic");
+            createTopic("topic2");
+            sgsTopicPartitions = IntStream.range(0, 3)
+                .mapToObj(part -> new 
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, part))
+                .toList();
+            this.warmup();
+        } catch (Exception e) {
+            fail(e);
+        }
+    }
+
+    @ClusterTest
     public void testPollNoSubscribeFails() {
-        try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        setup();
+        try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
             assertEquals(Collections.emptySet(), shareConsumer.subscription());
             // "Consumer is not subscribed to any topics."
             assertThrows(IllegalStateException.class, () -> 
shareConsumer.poll(Duration.ofMillis(500)));
         }
     }
 
-    @Test
+    @ClusterTest
     public void testSubscribeAndPollNoRecords() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
             Set<String> subscription = Collections.singleton(tp.topic());
             shareConsumer.subscribe(subscription);
             assertEquals(subscription, shareConsumer.subscription());
@@ -171,10 +167,11 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testSubscribePollUnsubscribe() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
             Set<String> subscription = Collections.singleton(tp.topic());
             shareConsumer.subscribe(subscription);
             assertEquals(subscription, shareConsumer.subscription());
@@ -186,10 +183,11 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testSubscribePollSubscribe() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
             Set<String> subscription = Collections.singleton(tp.topic());
             shareConsumer.subscribe(subscription);
             assertEquals(subscription, shareConsumer.subscription());
@@ -203,10 +201,11 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testSubscribeUnsubscribePollFails() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
             Set<String> subscription = Collections.singleton(tp.topic());
             shareConsumer.subscribe(subscription);
             assertEquals(subscription, shareConsumer.subscription());
@@ -220,10 +219,11 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testSubscribeSubscribeEmptyPollFails() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
             Set<String> subscription = Collections.singleton(tp.topic());
             shareConsumer.subscribe(subscription);
             assertEquals(subscription, shareConsumer.subscription());
@@ -237,11 +237,12 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testSubscriptionAndPoll() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -253,11 +254,12 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testSubscriptionAndPollMultiple() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -275,11 +277,12 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testAcknowledgementSentOnSubscriptionChange() throws 
ExecutionException, InterruptedException {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             Map<TopicPartition, Set<Long>> partitionOffsetsMap = new 
HashMap<>();
             Map<TopicPartition, Exception> partitionExceptionMap = new 
HashMap<>();
@@ -311,11 +314,12 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement() 
throws Exception {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             Map<TopicPartition, Set<Long>> partitionOffsetsMap = new 
HashMap<>();
             Map<TopicPartition, Exception> partitionExceptionMap = new 
HashMap<>();
@@ -341,11 +345,12 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testAcknowledgementCommitCallbackOnClose() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             Map<TopicPartition, Set<Long>> partitionOffsetsMap = new 
HashMap<>();
             Map<TopicPartition, Exception> partitionExceptionMap = new 
HashMap<>();
@@ -372,11 +377,12 @@ public class ShareConsumerTest {
     }
 
     @Flaky("KAFKA-18033")
-    @Test
+    @ClusterTest
     public void testAcknowledgementCommitCallbackInvalidRecordStateException() 
throws Exception {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             Map<TopicPartition, Set<Long>> partitionOffsetsMap = new 
HashMap<>();
             Map<TopicPartition, Exception> partitionExceptionMap = new 
HashMap<>();
@@ -426,11 +432,12 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testHeaders() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             int numRecords = 1;
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
@@ -454,8 +461,16 @@ public class ShareConsumerTest {
 
     private void testHeadersSerializeDeserialize(Serializer<byte[]> 
serializer, Deserializer<byte[]> deserializer) {
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), serializer);
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(deserializer, new ByteArrayDeserializer(), "group1")) {
+        Map<String, Object> producerConfig = Map.of(
+            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
serializer.getClass().getName()
+        );
+
+        Map<String, Object> consumerConfig = Map.of(
+            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
deserializer.getClass().getName()
+        );
+
+        try (Producer<byte[], byte[]> producer = 
createProducer(producerConfig);
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1", consumerConfig)) {
 
             int numRecords = 1;
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
@@ -469,20 +484,22 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testHeadersSerializerDeserializer() {
+        setup();
         testHeadersSerializeDeserialize(new BaseConsumerTest.SerializerImpl(), 
new BaseConsumerTest.DeserializerImpl());
         verifyShareGroupStateTopicRecordsProduced();
     }
 
-    @Test
+    @ClusterTest
     public void testMaxPollRecords() {
+        setup();
         int numRecords = 10000;
         int maxPollRecords = 2;
 
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
-                 "group1", 
Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
String.valueOf(maxPollRecords)))) {
+        try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1",
+            Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
String.valueOf(maxPollRecords)))) {
 
             long startingTimestamp = System.currentTimeMillis();
             produceMessagesWithTimestamp(numRecords, startingTimestamp);
@@ -508,12 +525,13 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testControlRecordsSkipped() throws Exception {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> transactionalProducer = 
createProducer(new ByteArraySerializer(), new ByteArraySerializer(), "T1");
-             KafkaProducer<byte[], byte[]> nonTransactionalProducer = 
createProducer(new ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
+             Producer<byte[], byte[]> nonTransactionalProducer = 
createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
 
@@ -553,11 +571,12 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testExplicitAcknowledgeSuccess() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -574,11 +593,12 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testExplicitAcknowledgeCommitSuccess() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -597,12 +617,13 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testExplicitAcknowledgementCommitAsync() throws 
InterruptedException {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
-             KafkaShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer("group1");
+             ShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record1 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             ProducerRecord<byte[], byte[]> record2 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
@@ -653,11 +674,12 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testExplicitAcknowledgementCommitAsyncPartialBatch() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record1 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             ProducerRecord<byte[], byte[]> record2 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
@@ -716,11 +738,12 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testExplicitAcknowledgeReleasePollAccept() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -739,11 +762,12 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testExplicitAcknowledgeReleaseAccept() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -759,11 +783,12 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testExplicitAcknowledgeReleaseClose() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -777,11 +802,12 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testExplicitAcknowledgeThrowsNotInBatch() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -799,11 +825,12 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testImplicitAcknowledgeFailsExplicit() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -820,11 +847,12 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testImplicitAcknowledgeCommitSync() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -843,11 +871,12 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testImplicitAcknowledgementCommitAsync() throws 
InterruptedException {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record1 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             ProducerRecord<byte[], byte[]> record2 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
@@ -882,14 +911,19 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testFetchRecordLargerThanMaxPartitionFetchBytes() throws 
Exception {
+        setup();
         int maxPartitionFetchBytes = 10000;
 
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
-                 "group1", 
Collections.singletonMap(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
String.valueOf(maxPartitionFetchBytes)))) {
+        try (
+            Producer<byte[], byte[]> producer = createProducer();
+            ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                "group1",
+                
Collections.singletonMap(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
String.valueOf(maxPartitionFetchBytes))
+            )
+        ) {
 
             ProducerRecord<byte[], byte[]> smallRecord = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             ProducerRecord<byte[], byte[]> bigRecord = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), new 
byte[maxPartitionFetchBytes]);
@@ -904,13 +938,14 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testMultipleConsumersWithDifferentGroupIds() throws 
InterruptedException {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         alterShareAutoOffsetReset("group2", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
-             KafkaShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group2")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer("group1");
+             ShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer("group2")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
 
@@ -955,12 +990,13 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testMultipleConsumersInGroupSequentialConsumption() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
-             KafkaShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer("group1");
+             ShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             shareConsumer1.subscribe(Collections.singleton(tp.topic()));
@@ -992,9 +1028,10 @@ public class ShareConsumerTest {
     }
 
     @Flaky("KAFKA-18033")
-    @Test
+    @ClusterTest
     public void testMultipleConsumersInGroupConcurrentConsumption()
             throws InterruptedException, ExecutionException, TimeoutException {
+        setup();
         AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
 
         int consumerCount = 4;
@@ -1026,9 +1063,10 @@ public class ShareConsumerTest {
         assertEquals(producerCount * messagesPerProducer, totalResult);
     }
 
-    @Test
+    @ClusterTest
     public void testMultipleConsumersInMultipleGroupsConcurrentConsumption()
             throws ExecutionException, InterruptedException, TimeoutException {
+        setup();
         AtomicInteger totalMessagesConsumedGroup1 = new AtomicInteger(0);
         AtomicInteger totalMessagesConsumedGroup2 = new AtomicInteger(0);
         AtomicInteger totalMessagesConsumedGroup3 = new AtomicInteger(0);
@@ -1089,12 +1127,13 @@ public class ShareConsumerTest {
         verifyShareGroupStateTopicRecordsProduced();
     }
 
-    @Test
+    @ClusterTest
     public void testConsumerCloseInGroupSequential() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
-             KafkaShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer("group1");
+             ShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             shareConsumer1.subscribe(Collections.singleton(tp.topic()));
@@ -1136,9 +1175,10 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testMultipleConsumersInGroupFailureConcurrentConsumption()
             throws InterruptedException, ExecutionException, TimeoutException {
+        setup();
         AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
 
         int consumerCount = 4;
@@ -1181,11 +1221,12 @@ public class ShareConsumerTest {
         verifyShareGroupStateTopicRecordsProduced();
     }
 
-    @Test
+    @ClusterTest
     public void testAcquisitionLockTimeoutOnConsumer() throws 
InterruptedException {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> producerRecord1 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null,
                 "key_1".getBytes(), "value_1".getBytes());
@@ -1242,14 +1283,15 @@ public class ShareConsumerTest {
     }
 
     /**
-     * Test to verify that the acknowledgement commit callback cannot invoke 
methods of KafkaShareConsumer.
+     * Test to verify that the acknowledgement commit callback cannot invoke 
methods of ShareConsumer.
      * The exception thrown is verified in {@link 
TestableAcknowledgementCommitCallbackWithShareConsumer}
      */
-    @Test
+    @ClusterTest
     public void 
testAcknowledgementCommitCallbackCallsShareConsumerDisallowed() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -1258,7 +1300,7 @@ public class ShareConsumerTest {
             shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallbackWithShareConsumer<>(shareConsumer));
             shareConsumer.subscribe(Collections.singleton(tp.topic()));
 
-            // The acknowledgment commit callback will try to call a method of 
KafkaShareConsumer
+            // The acknowledgment commit callback will try to call a method of 
ShareConsumer
             shareConsumer.poll(Duration.ofMillis(5000));
             // The second poll sends the acknowledgements implicitly.
             // The acknowledgement commit callback will be called and the 
exception is thrown.
@@ -1269,15 +1311,15 @@ public class ShareConsumerTest {
     }
 
     private class TestableAcknowledgementCommitCallbackWithShareConsumer<K, V> 
implements AcknowledgementCommitCallback {
-        private final KafkaShareConsumer<K, V> shareConsumer;
+        private final ShareConsumer<K, V> shareConsumer;
 
-        
TestableAcknowledgementCommitCallbackWithShareConsumer(KafkaShareConsumer<K, V> 
shareConsumer) {
+        
TestableAcknowledgementCommitCallbackWithShareConsumer(ShareConsumer<K, V> 
shareConsumer) {
             this.shareConsumer = shareConsumer;
         }
 
         @Override
         public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, 
Exception exception) {
-            // Accessing methods of KafkaShareConsumer should throw an 
exception.
+            // Accessing methods of ShareConsumer should throw an exception.
             assertThrows(IllegalStateException.class, shareConsumer::close);
             assertThrows(IllegalStateException.class, () -> 
shareConsumer.subscribe(Collections.singleton(tp.topic())));
             assertThrows(IllegalStateException.class, () -> 
shareConsumer.poll(Duration.ofMillis(5000)));
@@ -1285,20 +1327,21 @@ public class ShareConsumerTest {
     }
 
     /**
-     * Test to verify that the acknowledgement commit callback can invoke 
KafkaShareConsumer.wakeup() and it
+     * Test to verify that the acknowledgement commit callback can invoke 
ShareConsumer.wakeup() and it
      * wakes up the enclosing poll.
      */
-    @Test
+    @ClusterTest
     public void testAcknowledgementCommitCallbackCallsShareConsumerWakeup() 
throws InterruptedException {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
             producer.flush();
 
-            // The acknowledgment commit callback will try to call a method of 
KafkaShareConsumer
+            // The acknowledgment commit callback will try to call a method of 
ShareConsumer
             shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallbackWakeup<>(shareConsumer));
             shareConsumer.subscribe(Collections.singleton(tp.topic()));
 
@@ -1324,9 +1367,9 @@ public class ShareConsumerTest {
     }
 
     private static class TestableAcknowledgementCommitCallbackWakeup<K, V> 
implements AcknowledgementCommitCallback {
-        private final KafkaShareConsumer<K, V> shareConsumer;
+        private final ShareConsumer<K, V> shareConsumer;
 
-        TestableAcknowledgementCommitCallbackWakeup(KafkaShareConsumer<K, V> 
shareConsumer) {
+        TestableAcknowledgementCommitCallbackWakeup(ShareConsumer<K, V> 
shareConsumer) {
             this.shareConsumer = shareConsumer;
         }
 
@@ -1340,11 +1383,12 @@ public class ShareConsumerTest {
      * Test to verify that the acknowledgement commit callback can throw an 
exception, and it is propagated
      * to the caller of poll().
      */
-    @Test
+    @ClusterTest
     public void testAcknowledgementCommitCallbackThrowsException() throws 
InterruptedException {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -1377,13 +1421,14 @@ public class ShareConsumerTest {
     }
 
     /**
-     * Test to verify that calling Thread.interrupt() before 
KafkaShareConsumer.poll(Duration)
+     * Test to verify that calling Thread.interrupt() before 
ShareConsumer.poll(Duration)
      * causes it to throw InterruptException
      */
-    @Test
+    @ClusterTest
     public void testPollThrowsInterruptExceptionIfInterrupted() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             shareConsumer.subscribe(Collections.singleton(tp.topic()));
 
@@ -1404,10 +1449,11 @@ public class ShareConsumerTest {
      * Test to verify that InvalidTopicException is thrown if the consumer 
subscribes
      * to an invalid topic.
      */
-    @Test
+    @ClusterTest
     public void testSubscribeOnInvalidTopicThrowsInvalidTopicException() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             shareConsumer.subscribe(Collections.singleton("topic abc"));
 
@@ -1421,11 +1467,12 @@ public class ShareConsumerTest {
      * Test to ensure that a wakeup when records are buffered doesn't prevent 
the records
      * being returned on the next poll.
      */
-    @Test
+    @ClusterTest
     public void testWakeupWithFetchedRecordsAvailable() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -1442,11 +1489,12 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testSubscriptionFollowedByTopicCreation() throws 
InterruptedException {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             String topic = "foo";
             shareConsumer.subscribe(Collections.singleton(topic));
@@ -1471,16 +1519,17 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testSubscriptionAndPollFollowedByTopicDeletion() throws 
InterruptedException, ExecutionException {
+        setup();
         String topic1 = "bar";
         String topic2 = "baz";
         createTopic(topic1);
         createTopic(topic2);
 
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> recordTopic1 = new 
ProducerRecord<>(topic1, 0, null, "key".getBytes(), "value".getBytes());
             ProducerRecord<byte[], byte[]> recordTopic2 = new 
ProducerRecord<>(topic2, 0, null, "key".getBytes(), "value".getBytes());
@@ -1513,12 +1562,16 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testLsoMovementByRecordsDeletion() {
+        setup();
         String groupId = "group1";
 
         alterShareAutoOffsetReset(groupId, "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer())) {
+        try (
+            Producer<byte[], byte[]> producer = createProducer();
+            Admin adminClient = createAdminClient()
+        ) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), 0, null, "key".getBytes(), "value".getBytes());
 
@@ -1555,10 +1608,11 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testShareAutoOffsetResetDefaultValue() {
-        try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
-             KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer())) {
+        setup();
+        try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1");
+             Producer<byte[], byte[]> producer = createProducer()) {
 
             shareConsumer.subscribe(Collections.singleton(tp.topic()));
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
@@ -1581,11 +1635,12 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testShareAutoOffsetResetEarliest() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
-             KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer())) {
+        try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1");
+             Producer<byte[], byte[]> producer = createProducer()) {
 
             shareConsumer.subscribe(Collections.singleton(tp.topic()));
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
@@ -1606,11 +1661,15 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testShareAutoOffsetResetEarliestAfterLsoMovement() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
-        try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
-             KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer())) {
+        try (
+            ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1");
+            Producer<byte[], byte[]> producer = createProducer();
+            Admin adminClient = createAdminClient()
+        ) {
 
             shareConsumer.subscribe(Collections.singleton(tp.topic()));
 
@@ -1630,13 +1689,14 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue() {
+        setup();
         alterShareAutoOffsetReset("group1", "earliest");
         alterShareAutoOffsetReset("group2", "latest");
-        try (KafkaShareConsumer<byte[], byte[]> shareConsumerEarliest = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
-             KafkaShareConsumer<byte[], byte[]> shareConsumerLatest = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group2");
-             KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer())) {
+        try (ShareConsumer<byte[], byte[]> shareConsumerEarliest = 
createShareConsumer("group1");
+             ShareConsumer<byte[], byte[]> shareConsumerLatest = 
createShareConsumer("group2");
+             Producer<byte[], byte[]> producer = createProducer()) {
 
             shareConsumerEarliest.subscribe(Collections.singleton(tp.topic()));
 
@@ -1670,13 +1730,14 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testShareAutoOffsetResetByDuration() throws Exception {
+        setup();
         // Set auto offset reset to 1 hour before current time
         alterShareAutoOffsetReset("group1", "by_duration:PT1H");
         
-        try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
-             KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer())) {
+        try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1");
+             Producer<byte[], byte[]> producer = createProducer()) {
 
             long currentTime = System.currentTimeMillis();
             long twoHoursAgo = currentTime - TimeUnit.HOURS.toMillis(2);
@@ -1711,8 +1772,8 @@ public class ShareConsumerTest {
         // Set the auto offset reset to 3 hours before current time
         // so the consumer should consume all messages (3 records)
         alterShareAutoOffsetReset("group2", "by_duration:PT3H");
-        try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group2");
-             KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer())) {
+        try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group2");
+             Producer<byte[], byte[]> producer = createProducer()) {
 
             shareConsumer.subscribe(Collections.singleton(tp.topic()));
             List<ConsumerRecord<byte[], byte[]>> records = 
consumeRecords(shareConsumer, 3);
@@ -1721,29 +1782,32 @@ public class ShareConsumerTest {
         }
     }
 
-    @Test
+    @ClusterTest
     public void testShareAutoOffsetResetByDurationInvalidFormat() throws 
Exception {
+        setup();
         // Test invalid duration format
         ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.GROUP, "group1");
         Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new 
HashMap<>();
         
         // Test invalid duration format
-        alterEntries.put(configResource, List.of(new AlterConfigOp(new 
ConfigEntry(
-            GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:1h"), 
AlterConfigOp.OpType.SET)));
-        ExecutionException e1 = assertThrows(ExecutionException.class, () -> 
-            adminClient.incrementalAlterConfigs(alterEntries).all().get());
-        assertInstanceOf(InvalidConfigurationException.class, e1.getCause());
+        try (Admin adminClient = createAdminClient()) {
+            alterEntries.put(configResource, List.of(new AlterConfigOp(new 
ConfigEntry(
+                GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:1h"), 
AlterConfigOp.OpType.SET)));
+            ExecutionException e1 = assertThrows(ExecutionException.class, () 
->
+                adminClient.incrementalAlterConfigs(alterEntries).all().get());
+            assertInstanceOf(InvalidConfigurationException.class, 
e1.getCause());
 
-        // Test negative duration
-        alterEntries.put(configResource, List.of(new AlterConfigOp(new 
ConfigEntry(
-            GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:-PT1H"), 
AlterConfigOp.OpType.SET)));
-        ExecutionException e2 = assertThrows(ExecutionException.class, () -> 
-            adminClient.incrementalAlterConfigs(alterEntries).all().get());
-        assertInstanceOf(InvalidConfigurationException.class, e2.getCause());
+            // Test negative duration
+            alterEntries.put(configResource, List.of(new AlterConfigOp(new 
ConfigEntry(
+                GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, 
"by_duration:-PT1H"), AlterConfigOp.OpType.SET)));
+            ExecutionException e2 = assertThrows(ExecutionException.class, () 
->
+                adminClient.incrementalAlterConfigs(alterEntries).all().get());
+            assertInstanceOf(InvalidConfigurationException.class, 
e2.getCause());
+        }
     }
 
     private int produceMessages(int messageCount) {
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer())) {
+        try (Producer<byte[], byte[]> producer = createProducer()) {
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             IntStream.range(0, messageCount).forEach(__ -> 
producer.send(record));
             producer.flush();
@@ -1752,7 +1816,7 @@ public class ShareConsumerTest {
     }
 
     private void produceMessagesWithTimestamp(int messageCount, long 
startingTimestamp) {
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer())) {
+        try (Producer<byte[], byte[]> producer = createProducer()) {
             for (int i = 0; i < messageCount; i++) {
                 ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), startingTimestamp + i,
                     ("key " + i).getBytes(), ("value " + i).getBytes());
@@ -1769,8 +1833,8 @@ public class ShareConsumerTest {
                                  int maxPolls,
                                  boolean commit) {
         return assertDoesNotThrow(() -> {
-            try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(
-                    new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
groupId)) {
+            try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(
+                    groupId)) {
                 shareConsumer.subscribe(Collections.singleton(tp.topic()));
                 return consumeMessages(shareConsumer, totalMessagesConsumed, 
totalMessages, consumerNumber, maxPolls, commit);
             }
@@ -1785,8 +1849,8 @@ public class ShareConsumerTest {
                                  boolean commit,
                                  int maxFetchBytes) {
         return assertDoesNotThrow(() -> {
-            try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(
-                    new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
groupId,
+            try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(
+                    groupId,
                     Map.of(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
maxFetchBytes))) {
                 shareConsumer.subscribe(Collections.singleton(tp.topic()));
                 return consumeMessages(shareConsumer, totalMessagesConsumed, 
totalMessages, consumerNumber, maxPolls, commit);
@@ -1794,7 +1858,7 @@ public class ShareConsumerTest {
         }, "Consumer " + consumerNumber + " failed with exception");
     }
 
-    private int consumeMessages(KafkaShareConsumer<byte[], byte[]> consumer,
+    private int consumeMessages(ShareConsumer<byte[], byte[]> consumer,
                                  AtomicInteger totalMessagesConsumed,
                                  int totalMessages,
                                  int consumerNumber,
@@ -1827,7 +1891,7 @@ public class ShareConsumerTest {
         }, "Consumer " + consumerNumber + " failed with exception");
     }
 
-    private <K, V> List<ConsumerRecord<K, V>> 
consumeRecords(KafkaShareConsumer<K, V> consumer,
+    private <K, V> List<ConsumerRecord<K, V>> consumeRecords(ShareConsumer<K, 
V> consumer,
                                                              int numRecords) {
         ArrayList<ConsumerRecord<K, V>> accumulatedRecords = new ArrayList<>();
         long startTimeMs = System.currentTimeMillis();
@@ -1841,58 +1905,55 @@ public class ShareConsumerTest {
     }
 
     private void createTopic(String topicName) {
-        Properties props = cluster.clientProperties();
         assertDoesNotThrow(() -> {
-            try (Admin admin = Admin.create(props)) {
+            try (Admin admin = createAdminClient()) {
                 admin.createTopics(Collections.singleton(new 
NewTopic(topicName, 1, (short) 1))).all().get();
             }
         }, "Failed to create topic");
     }
 
     private void deleteTopic(String topicName) {
-        Properties props = cluster.clientProperties();
         assertDoesNotThrow(() -> {
-            try (Admin admin = Admin.create(props)) {
+            try (Admin admin = createAdminClient()) {
                 
admin.deleteTopics(Collections.singleton(topicName)).all().get();
             }
         }, "Failed to delete topic");
     }
 
     private Admin createAdminClient() {
-        Properties props = cluster.clientProperties();
-        return Admin.create(props);
+        return cluster.admin();
     }
 
-    private <K, V> KafkaProducer<K, V> createProducer(Serializer<K> 
keySerializer,
-                                                      Serializer<V> 
valueSerializer) {
-        Properties props = cluster.clientProperties();
-        return new KafkaProducer<>(props, keySerializer, valueSerializer);
+    private <K, V> Producer<K, V> createProducer() {
+        return createProducer(Map.of());
     }
 
-    private <K, V> KafkaProducer<K, V> createProducer(Serializer<K> 
keySerializer,
-                                                      Serializer<V> 
valueSerializer,
-                                                      String transactionalId) {
-        Properties props = cluster.clientProperties();
-        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-        return new KafkaProducer<>(props, keySerializer, valueSerializer);
+    private <K, V> Producer<K, V> createProducer(Map<String, Object> config) {
+        return cluster.producer(config);
     }
 
-    private <K, V> KafkaShareConsumer<K, V> 
createShareConsumer(Deserializer<K> keyDeserializer,
-                                                                
Deserializer<V> valueDeserializer,
-                                                                String 
groupId) {
-        Properties props = cluster.clientProperties();
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
-        return new KafkaShareConsumer<>(props, keyDeserializer, 
valueDeserializer);
+    private <K, V> Producer<K, V> createProducer(String transactionalId) {
+        return createProducer(
+            Map.of(
+                ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId
+            )
+        );
     }
 
-    private <K, V> KafkaShareConsumer<K, V> 
createShareConsumer(Deserializer<K> keyDeserializer,
-                                                                
Deserializer<V> valueDeserializer,
-                                                                String groupId,
-                                                                Map<?, ?> 
additionalProperties) {
-        Properties props = cluster.clientProperties();
-        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+    private <K, V> ShareConsumer<K, V> createShareConsumer(String groupId) {
+        return createShareConsumer(groupId, Map.of());
+    }
+
+    private <K, V> ShareConsumer<K, V> createShareConsumer(
+        String groupId,
+        Map<?, ?> additionalProperties
+    ) {
+        Properties props = new Properties();
         props.putAll(additionalProperties);
-        return new KafkaShareConsumer<>(props, keyDeserializer, 
valueDeserializer);
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        Map<String, Object> conf = new HashMap<>();
+        props.forEach((k, v) -> conf.put((String) k, v));
+        return cluster.shareConsumer(conf);
     }
 
     private void warmup() throws InterruptedException {
@@ -1901,8 +1962,8 @@ public class ShareConsumerTest {
         ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(warmupTp.topic(), warmupTp.partition(), null, 
"key".getBytes(), "value".getBytes());
         Set<String> subscription = Collections.singleton(warmupTp.topic());
         alterShareAutoOffsetReset("warmupgroup1", "earliest");
-        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
-             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"warmupgroup1")) {
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("warmupgroup1")) {
 
             producer.send(record);
             producer.flush();
@@ -1921,12 +1982,7 @@ public class ShareConsumerTest {
 
     private void verifyShareGroupStateTopicRecordsProduced() {
         try {
-            Map<String, Object> consumerConfigs = new HashMap<>();
-            consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
-            consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
-            
consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
-
-            try (KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerConfigs)) {
+            try (Consumer<byte[], byte[]> consumer = cluster.consumer()) {
                 consumer.assign(sgsTopicPartitions);
                 consumer.seekToBeginning(sgsTopicPartitions);
                 Set<ConsumerRecord<byte[], byte[]>> records = new HashSet<>();
@@ -1953,8 +2009,10 @@ public class ShareConsumerTest {
         alterEntries.put(configResource, List.of(new AlterConfigOp(new 
ConfigEntry(
             GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, newValue), 
AlterConfigOp.OpType.SET)));
         AlterConfigsOptions alterOptions = new AlterConfigsOptions();
-        assertDoesNotThrow(() -> 
adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
+        try (Admin adminClient = createAdminClient()) {
+            assertDoesNotThrow(() -> 
adminClient.incrementalAlterConfigs(alterEntries, alterOptions)
                 .all()
                 .get(60, TimeUnit.SECONDS), "Failed to alter configs");
+        }
     }
 }
diff --git 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
index 1c8551bf9e6..eb97c2c92a1 100644
--- 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
+++ 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterInstance.java
@@ -32,6 +32,8 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.KafkaShareConsumer;
+import org.apache.kafka.clients.consumer.ShareConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -188,6 +190,20 @@ public interface ClusterInstance {
         return consumer(Map.of());
     }
 
+    default <K, V> ShareConsumer<K, V> shareConsumer() {
+        return shareConsumer(Map.of());
+    }
+
+    default <K, V> ShareConsumer<K, V> shareConsumer(Map<String, Object> 
configs) {
+        Map<String, Object> props = new HashMap<>(configs);
+        props.putIfAbsent(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" + 
TestUtils.randomString(5));
+        props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers());
+        return new KafkaShareConsumer<>(setClientSaslConfig(props));
+    }
+
     default Admin admin(Map<String, Object> configs, boolean 
usingBootstrapControllers) {
         Map<String, Object> props = new HashMap<>(configs);
         if (usingBootstrapControllers) {

Reply via email to