This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e3a56f3162c KAFKA-18513: Validate share state topic records produced 
in tests. (#18521)
e3a56f3162c is described below

commit e3a56f3162c681e6c81bec4d09b45fe00347d4d3
Author: Sushant Mahajan <smaha...@confluent.io>
AuthorDate: Wed Jan 15 21:40:07 2025 +0530

    KAFKA-18513: Validate share state topic records produced in tests. (#18521)
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>
---
 .../java/kafka/test/api/ShareConsumerTest.java     | 353 +++++++++++----------
 1 file changed, 189 insertions(+), 164 deletions(-)

diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java 
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index 6551f80b5aa..237fe34bbe7 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -29,6 +29,7 @@ import 
org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.KafkaShareConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -44,6 +45,7 @@ import 
org.apache.kafka.common.errors.InvalidRecordStateException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.internals.Topic;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -59,10 +61,9 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -91,10 +92,12 @@ import static 
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 @Timeout(1200)
 @Tag("integration")
@@ -103,17 +106,12 @@ public class ShareConsumerTest {
     private final TopicPartition tp = new TopicPartition("topic", 0);
     private final TopicPartition tp2 = new TopicPartition("topic2", 0);
     private final TopicPartition warmupTp = new TopicPartition("warmup", 0);
-    private static final String DEFAULT_STATE_PERSISTER = 
"org.apache.kafka.server.share.persister.DefaultStatePersister";
-    private static final String NO_OP_PERSISTER = 
"org.apache.kafka.server.share.persister.NoOpShareStatePersister";
+    private List<TopicPartition> sgsTopicPartitions;
 
     private Admin adminClient;
 
     @BeforeEach
     public void createCluster(TestInfo testInfo) throws Exception {
-        String persisterClassName = NO_OP_PERSISTER;
-        if (testInfo.getDisplayName().contains(".persister=")) {
-            persisterClassName = testInfo.getDisplayName().split("=")[1];
-        }
         cluster = new KafkaClusterTestKit.Builder(
             new TestKitNodes.Builder()
                 .setNumBrokerNodes(1)
@@ -123,7 +121,6 @@ public class ShareConsumerTest {
             .setConfigProp("group.coordinator.rebalance.protocols", 
"classic,consumer,share")
             .setConfigProp("group.share.enable", "true")
             .setConfigProp("group.share.partition.max.record.locks", "10000")
-            .setConfigProp("group.share.persister.class.name", 
persisterClassName)
             .setConfigProp("group.share.record.lock.duration.ms", "15000")
             .setConfigProp("offsets.topic.replication.factor", "1")
             .setConfigProp("share.coordinator.state.topic.min.isr", "1")
@@ -140,6 +137,9 @@ public class ShareConsumerTest {
         createTopic("topic");
         createTopic("topic2");
         adminClient = createAdminClient();
+        sgsTopicPartitions = IntStream.range(0, 3)
+            .mapToObj(part -> new 
TopicPartition(Topic.SHARE_GROUP_STATE_TOPIC_NAME, part))
+            .toList();
         warmup();
     }
 
@@ -149,9 +149,8 @@ public class ShareConsumerTest {
         cluster.close();
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testPollNoSubscribeFails(String persister) {
+    @Test
+    public void testPollNoSubscribeFails() {
         try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
             assertEquals(Collections.emptySet(), shareConsumer.subscription());
             // "Consumer is not subscribed to any topics."
@@ -159,9 +158,8 @@ public class ShareConsumerTest {
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testSubscribeAndPollNoRecords(String persister) {
+    @Test
+    public void testSubscribeAndPollNoRecords() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
             Set<String> subscription = Collections.singleton(tp.topic());
@@ -169,12 +167,12 @@ public class ShareConsumerTest {
             assertEquals(subscription, shareConsumer.subscription());
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
             assertEquals(0, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testSubscribePollUnsubscribe(String persister) {
+    @Test
+    public void testSubscribePollUnsubscribe() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
             Set<String> subscription = Collections.singleton(tp.topic());
@@ -184,12 +182,12 @@ public class ShareConsumerTest {
             shareConsumer.unsubscribe();
             assertEquals(Collections.emptySet(), shareConsumer.subscription());
             assertEquals(0, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testSubscribePollSubscribe(String persister) {
+    @Test
+    public void testSubscribePollSubscribe() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
             Set<String> subscription = Collections.singleton(tp.topic());
@@ -201,12 +199,12 @@ public class ShareConsumerTest {
             assertEquals(subscription, shareConsumer.subscription());
             records = shareConsumer.poll(Duration.ofMillis(500));
             assertEquals(0, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testSubscribeUnsubscribePollFails(String persister) {
+    @Test
+    public void testSubscribeUnsubscribePollFails() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
             Set<String> subscription = Collections.singleton(tp.topic());
@@ -218,12 +216,12 @@ public class ShareConsumerTest {
             // "Consumer is not subscribed to any topics."
             assertThrows(IllegalStateException.class, () -> 
shareConsumer.poll(Duration.ofMillis(500)));
             assertEquals(0, records.count());
+            verifyShareGroupStateTopicRecordsProduced();   // due to leader 
epoch in read
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testSubscribeSubscribeEmptyPollFails(String persister) {
+    @Test
+    public void testSubscribeSubscribeEmptyPollFails() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
             Set<String> subscription = Collections.singleton(tp.topic());
@@ -235,12 +233,12 @@ public class ShareConsumerTest {
             // "Consumer is not subscribed to any topics."
             assertThrows(IllegalStateException.class, () -> 
shareConsumer.poll(Duration.ofMillis(500)));
             assertEquals(0, records.count());
+            verifyShareGroupStateTopicRecordsProduced();   // due to leader 
epoch in read
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testSubscriptionAndPoll(String persister) {
+    @Test
+    public void testSubscriptionAndPoll() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -251,12 +249,12 @@ public class ShareConsumerTest {
             shareConsumer.subscribe(Collections.singleton(tp.topic()));
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testSubscriptionAndPollMultiple(String persister) {
+    @Test
+    public void testSubscriptionAndPollMultiple() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -273,12 +271,12 @@ public class ShareConsumerTest {
             producer.send(record);
             records = shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testAcknowledgementSentOnSubscriptionChange(String persister) 
throws ExecutionException, InterruptedException {
+    @Test
+    public void testAcknowledgementSentOnSubscriptionChange() throws 
ExecutionException, InterruptedException {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -309,12 +307,12 @@ public class ShareConsumerTest {
             // Verifying if the callback was invoked without exceptions for 
the partitions for both topics.
             assertNull(partitionExceptionMap.get(tp));
             assertNull(partitionExceptionMap.get(tp2));
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void 
testAcknowledgementCommitCallbackSuccessfulAcknowledgement(String persister) 
throws Exception {
+    @Test
+    public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement() 
throws Exception {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -339,12 +337,12 @@ public class ShareConsumerTest {
 
             // We expect null exception as the acknowledgment error code is 
null.
             assertNull(partitionExceptionMap.get(tp));
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testAcknowledgementCommitCallbackOnClose(String persister) {
+    @Test
+    public void testAcknowledgementCommitCallbackOnClose() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -369,13 +367,13 @@ public class ShareConsumerTest {
             // We expect null exception as the acknowledgment error code is 
null.
             assertTrue(partitionExceptionMap.containsKey(tp));
             assertNull(partitionExceptionMap.get(tp));
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
     @Flaky("KAFKA-18033")
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void 
testAcknowledgementCommitCallbackInvalidRecordStateException(String persister) 
throws Exception {
+    @Test
+    public void testAcknowledgementCommitCallbackInvalidRecordStateException() 
throws Exception {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -428,9 +426,8 @@ public class ShareConsumerTest {
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testHeaders(String persister) {
+    @Test
+    public void testHeaders() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -451,6 +448,7 @@ public class ShareConsumerTest {
                 if (header != null)
                     assertEquals("headerValue", new String(header.value()));
             }
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
@@ -471,15 +469,14 @@ public class ShareConsumerTest {
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testHeadersSerializerDeserializer(String persister) {
+    @Test
+    public void testHeadersSerializerDeserializer() {
         testHeadersSerializeDeserialize(new BaseConsumerTest.SerializerImpl(), 
new BaseConsumerTest.DeserializerImpl());
+        verifyShareGroupStateTopicRecordsProduced();
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testMaxPollRecords(String persister) {
+    @Test
+    public void testMaxPollRecords() {
         int numRecords = 10000;
         int maxPollRecords = 2;
 
@@ -507,12 +504,12 @@ public class ShareConsumerTest {
 
                 i++;
             }
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testControlRecordsSkipped(String persister) throws Exception {
+    @Test
+    public void testControlRecordsSkipped() throws Exception {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> transactionalProducer = 
createProducer(new ByteArraySerializer(), new ByteArraySerializer(), "T1");
              KafkaProducer<byte[], byte[]> nonTransactionalProducer = 
createProducer(new ByteArraySerializer(), new ByteArraySerializer());
@@ -552,12 +549,12 @@ public class ShareConsumerTest {
 
             records = shareConsumer.poll(Duration.ofMillis(500));
             assertEquals(0, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testExplicitAcknowledgeSuccess(String persister) {
+    @Test
+    public void testExplicitAcknowledgeSuccess() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -573,12 +570,12 @@ public class ShareConsumerTest {
             producer.send(record);
             records = shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testExplicitAcknowledgeCommitSuccess(String persister) {
+    @Test
+    public void testExplicitAcknowledgeCommitSuccess() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -596,12 +593,12 @@ public class ShareConsumerTest {
             assertEquals(1, result.size());
             records = shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testExplicitAcknowledgementCommitAsync(String persister) 
throws InterruptedException {
+    @Test
+    public void testExplicitAcknowledgementCommitAsync() throws 
InterruptedException {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
@@ -652,12 +649,12 @@ public class ShareConsumerTest {
             }, 30000, 100L, () -> "Didn't receive call to callback");
 
             assertNull(partitionExceptionMap1.get(tp));
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testExplicitAcknowledgementCommitAsyncPartialBatch(String 
persister) {
+    @Test
+    public void testExplicitAcknowledgementCommitAsyncPartialBatch() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -715,12 +712,12 @@ public class ShareConsumerTest {
 
             assertTrue(partitionExceptionMap.containsKey(tp));
             assertNull(partitionExceptionMap.get(tp));
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testExplicitAcknowledgeReleasePollAccept(String persister) {
+    @Test
+    public void testExplicitAcknowledgeReleasePollAccept() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -738,12 +735,12 @@ public class ShareConsumerTest {
             records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
             records = shareConsumer.poll(Duration.ofMillis(500));
             assertEquals(0, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testExplicitAcknowledgeReleaseAccept(String persister) {
+    @Test
+    public void testExplicitAcknowledgeReleaseAccept() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -758,12 +755,12 @@ public class ShareConsumerTest {
             records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
             records = shareConsumer.poll(Duration.ofMillis(500));
             assertEquals(0, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testExplicitAcknowledgeReleaseClose(String persister) {
+    @Test
+    public void testExplicitAcknowledgeReleaseClose() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -776,12 +773,12 @@ public class ShareConsumerTest {
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
             records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testExplicitAcknowledgeThrowsNotInBatch(String persister) {
+    @Test
+    public void testExplicitAcknowledgeThrowsNotInBatch() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -798,12 +795,12 @@ public class ShareConsumerTest {
             records = shareConsumer.poll(Duration.ofMillis(500));
             assertEquals(0, records.count());
             assertThrows(IllegalStateException.class, () -> 
shareConsumer.acknowledge(consumedRecord));
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testImplicitAcknowledgeFailsExplicit(String persister) {
+    @Test
+    public void testImplicitAcknowledgeFailsExplicit() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -819,12 +816,12 @@ public class ShareConsumerTest {
             records = shareConsumer.poll(Duration.ofMillis(500));
             assertEquals(0, records.count());
             assertThrows(IllegalStateException.class, () -> 
shareConsumer.acknowledge(consumedRecord));
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testImplicitAcknowledgeCommitSync(String persister) {
+    @Test
+    public void testImplicitAcknowledgeCommitSync() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -842,12 +839,12 @@ public class ShareConsumerTest {
             assertEquals(0, result.size());
             records = shareConsumer.poll(Duration.ofMillis(500));
             assertEquals(0, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testImplicitAcknowledgementCommitAsync(String persister) 
throws InterruptedException {
+    @Test
+    public void testImplicitAcknowledgementCommitAsync() throws 
InterruptedException {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -881,12 +878,12 @@ public class ShareConsumerTest {
             }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Acknowledgement commit 
callback did not receive the response yet");
 
             assertNull(partitionExceptionMap1.get(tp));
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testFetchRecordLargerThanMaxPartitionFetchBytes(String 
persister) throws Exception {
+    @Test
+    public void testFetchRecordLargerThanMaxPartitionFetchBytes() throws 
Exception {
         int maxPartitionFetchBytes = 10000;
 
         alterShareAutoOffsetReset("group1", "earliest");
@@ -903,12 +900,12 @@ public class ShareConsumerTest {
 
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(2, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testMultipleConsumersWithDifferentGroupIds(String persister) 
throws InterruptedException {
+    @Test
+    public void testMultipleConsumersWithDifferentGroupIds() throws 
InterruptedException {
         alterShareAutoOffsetReset("group1", "earliest");
         alterShareAutoOffsetReset("group2", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
@@ -954,12 +951,12 @@ public class ShareConsumerTest {
                 int records2 = 
shareConsumer2Records.addAndGet(shareConsumer2.poll(Duration.ofMillis(2000)).count());
                 return records1 == 3 && records2 == 5;
             }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records for 
both consumers for the last batch");
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testMultipleConsumersInGroupSequentialConsumption(String 
persister) {
+    @Test
+    public void testMultipleConsumersInGroupSequentialConsumption() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
@@ -995,9 +992,8 @@ public class ShareConsumerTest {
     }
 
     @Flaky("KAFKA-18033")
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testMultipleConsumersInGroupConcurrentConsumption(String 
persister)
+    @Test
+    public void testMultipleConsumersInGroupConcurrentConsumption()
             throws InterruptedException, ExecutionException, TimeoutException {
         AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
 
@@ -1030,9 +1026,8 @@ public class ShareConsumerTest {
         assertEquals(producerCount * messagesPerProducer, totalResult);
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void 
testMultipleConsumersInMultipleGroupsConcurrentConsumption(String persister)
+    @Test
+    public void testMultipleConsumersInMultipleGroupsConcurrentConsumption()
             throws ExecutionException, InterruptedException, TimeoutException {
         AtomicInteger totalMessagesConsumedGroup1 = new AtomicInteger(0);
         AtomicInteger totalMessagesConsumedGroup2 = new AtomicInteger(0);
@@ -1091,11 +1086,11 @@ public class ShareConsumerTest {
         assertEquals(totalMessagesSent, totalResult2);
         assertEquals(totalMessagesSent, totalResult3);
         assertEquals(totalMessagesSent, actualMessageSent);
+        verifyShareGroupStateTopicRecordsProduced();
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testConsumerCloseInGroupSequential(String persister) {
+    @Test
+    public void testConsumerCloseInGroupSequential() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
@@ -1137,12 +1132,12 @@ public class ShareConsumerTest {
             }
             shareConsumer2.close();
             assertEquals(totalMessages, consumer1MessageCount + 
consumer2MessageCount);
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void 
testMultipleConsumersInGroupFailureConcurrentConsumption(String persister)
+    @Test
+    public void testMultipleConsumersInGroupFailureConcurrentConsumption()
             throws InterruptedException, ExecutionException, TimeoutException {
         AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
 
@@ -1183,11 +1178,11 @@ public class ShareConsumerTest {
 
         int totalSuccessResult = 
consumeMessagesFutures.stream().mapToInt(CompletableFuture::join).sum();
         assertEquals(producerCount * messagesPerProducer, totalSuccessResult);
+        verifyShareGroupStateTopicRecordsProduced();
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testAcquisitionLockTimeoutOnConsumer(String persister) throws 
InterruptedException {
+    @Test
+    public void testAcquisitionLockTimeoutOnConsumer() throws 
InterruptedException {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -1242,6 +1237,7 @@ public class ShareConsumerTest {
 
             consumerRecords = shareConsumer.poll(Duration.ofMillis(1000));
             assertEquals(0, consumerRecords.count());
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
@@ -1249,9 +1245,8 @@ public class ShareConsumerTest {
      * Test to verify that the acknowledgement commit callback cannot invoke 
methods of KafkaShareConsumer.
      * The exception thrown is verified in {@link 
TestableAcknowledgementCommitCallbackWithShareConsumer}
      */
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void 
testAcknowledgementCommitCallbackCallsShareConsumerDisallowed(String persister) 
{
+    @Test
+    public void 
testAcknowledgementCommitCallbackCallsShareConsumerDisallowed() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -1269,6 +1264,7 @@ public class ShareConsumerTest {
             // The acknowledgement commit callback will be called and the 
exception is thrown.
             // This is verified inside the onComplete() method implementation.
             shareConsumer.poll(Duration.ofMillis(500));
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
@@ -1292,9 +1288,8 @@ public class ShareConsumerTest {
      * Test to verify that the acknowledgement commit callback can invoke 
KafkaShareConsumer.wakeup() and it
      * wakes up the enclosing poll.
      */
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void 
testAcknowledgementCommitCallbackCallsShareConsumerWakeup(String persister) 
throws InterruptedException {
+    @Test
+    public void testAcknowledgementCommitCallbackCallsShareConsumerWakeup() 
throws InterruptedException {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -1324,6 +1319,7 @@ public class ShareConsumerTest {
                 }
                 return exceptionThrown.get();
             }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive expected 
exception");
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
@@ -1344,9 +1340,8 @@ public class ShareConsumerTest {
      * Test to verify that the acknowledgement commit callback can throw an 
exception, and it is propagated
      * to the caller of poll().
      */
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testAcknowledgementCommitCallbackThrowsException(String 
persister) throws InterruptedException {
+    @Test
+    public void testAcknowledgementCommitCallbackThrowsException() throws 
InterruptedException {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -1370,6 +1365,7 @@ public class ShareConsumerTest {
                 }
                 return exceptionThrown.get();
             }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive expected 
exception");
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
@@ -1384,9 +1380,8 @@ public class ShareConsumerTest {
      * Test to verify that calling Thread.interrupt() before 
KafkaShareConsumer.poll(Duration)
      * causes it to throw InterruptException
      */
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testPollThrowsInterruptExceptionIfInterrupted(String 
persister) {
+    @Test
+    public void testPollThrowsInterruptExceptionIfInterrupted() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
 
@@ -1409,9 +1404,8 @@ public class ShareConsumerTest {
      * Test to verify that InvalidTopicException is thrown if the consumer 
subscribes
      * to an invalid topic.
      */
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testSubscribeOnInvalidTopicThrowsInvalidTopicException(String 
persister) {
+    @Test
+    public void testSubscribeOnInvalidTopicThrowsInvalidTopicException() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
 
@@ -1427,9 +1421,8 @@ public class ShareConsumerTest {
      * Test to ensure that a wakeup when records are buffered doesn't prevent 
the records
      * being returned on the next poll.
      */
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testWakeupWithFetchedRecordsAvailable(String persister) {
+    @Test
+    public void testWakeupWithFetchedRecordsAvailable() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -1445,12 +1438,12 @@ public class ShareConsumerTest {
 
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testSubscriptionFollowedByTopicCreation(String persister) 
throws InterruptedException {
+    @Test
+    public void testSubscriptionFollowedByTopicCreation() throws 
InterruptedException {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
              KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
@@ -1474,12 +1467,12 @@ public class ShareConsumerTest {
             producer.send(record);
             records = shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testSubscriptionAndPollFollowedByTopicDeletion(String 
persister) throws InterruptedException, ExecutionException {
+    @Test
+    public void testSubscriptionAndPollFollowedByTopicDeletion() throws 
InterruptedException, ExecutionException {
         String topic1 = "bar";
         String topic2 = "baz";
         createTopic(topic1);
@@ -1516,12 +1509,12 @@ public class ShareConsumerTest {
             producer.send(recordTopic2).get();
             TestUtils.waitForCondition(() -> 
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
                 DEFAULT_MAX_WAIT_MS, 100L, () -> "incorrect number of 
records");
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testLsoMovementByRecordsDeletion(String persister) {
+    @Test
+    public void testLsoMovementByRecordsDeletion() {
         String groupId = "group1";
 
         alterShareAutoOffsetReset(groupId, "earliest");
@@ -1558,12 +1551,12 @@ public class ShareConsumerTest {
 
             messageCount = consumeMessages(new AtomicInteger(0), 0, groupId, 
1, 5, true);
             assertEquals(0, messageCount);
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testShareAutoOffsetResetDefaultValue(String persister) {
+    @Test
+    public void testShareAutoOffsetResetDefaultValue() {
         try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
              KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer())) {
 
@@ -1584,12 +1577,12 @@ public class ShareConsumerTest {
             records = shareConsumer.poll(Duration.ofMillis(5000));
             // Now the next record should be consumed successfully
             assertEquals(1, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testShareAutoOffsetResetEarliest(String persister) {
+    @Test
+    public void testShareAutoOffsetResetEarliest() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
              KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer())) {
@@ -1609,12 +1602,12 @@ public class ShareConsumerTest {
             records = shareConsumer.poll(Duration.ofMillis(5000));
             // The next records should also be consumed successfully
             assertEquals(1, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testShareAutoOffsetResetEarliestAfterLsoMovement(String 
persister) {
+    @Test
+    public void testShareAutoOffsetResetEarliestAfterLsoMovement() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
              KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer())) {
@@ -1633,12 +1626,12 @@ public class ShareConsumerTest {
             int consumedMessageCount = consumeMessages(new AtomicInteger(0), 
5, "group1", 1, 10, true);
             // The records returned belong to offsets 5-9.
             assertEquals(5, consumedMessageCount);
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void 
testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String persister) {
+    @Test
+    public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue() {
         alterShareAutoOffsetReset("group1", "earliest");
         alterShareAutoOffsetReset("group2", "latest");
         try (KafkaShareConsumer<byte[], byte[]> shareConsumerEarliest = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1");
@@ -1673,12 +1666,12 @@ public class ShareConsumerTest {
             records2 = shareConsumerLatest.poll(Duration.ofMillis(5000));
             // The next record should also be consumed successfully by group2
             assertEquals(1, records2.count());
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testShareAutoOffsetResetByDuration(String persister) throws 
Exception {
+    @Test
+    public void testShareAutoOffsetResetByDuration() throws Exception {
         // Set auto offset reset to 1 hour before current time
         alterShareAutoOffsetReset("group1", "by_duration:PT1H");
         
@@ -1724,12 +1717,12 @@ public class ShareConsumerTest {
             shareConsumer.subscribe(Collections.singleton(tp.topic()));
             List<ConsumerRecord<byte[], byte[]>> records = 
consumeRecords(shareConsumer, 3);
             assertEquals(3, records.size());
+            verifyShareGroupStateTopicRecordsProduced();
         }
     }
 
-    @ParameterizedTest(name = "{displayName}.persister={0}")
-    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
-    public void testShareAutoOffsetResetByDurationInvalidFormat(String 
persister) throws Exception {
+    @Test
+    public void testShareAutoOffsetResetByDurationInvalidFormat() throws 
Exception {
         // Test invalid duration format
         ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.GROUP, "group1");
         Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new 
HashMap<>();
@@ -1739,14 +1732,14 @@ public class ShareConsumerTest {
             GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:1h"), 
AlterConfigOp.OpType.SET)));
         ExecutionException e1 = assertThrows(ExecutionException.class, () -> 
             adminClient.incrementalAlterConfigs(alterEntries).all().get());
-        assertTrue(e1.getCause() instanceof InvalidConfigurationException);
+        assertInstanceOf(InvalidConfigurationException.class, e1.getCause());
 
         // Test negative duration
         alterEntries.put(configResource, List.of(new AlterConfigOp(new 
ConfigEntry(
             GroupConfig.SHARE_AUTO_OFFSET_RESET_CONFIG, "by_duration:-PT1H"), 
AlterConfigOp.OpType.SET)));
         ExecutionException e2 = assertThrows(ExecutionException.class, () -> 
             adminClient.incrementalAlterConfigs(alterEntries).all().get());
-        assertTrue(e2.getCause() instanceof InvalidConfigurationException);
+        assertInstanceOf(InvalidConfigurationException.class, e2.getCause());
     }
 
     private int produceMessages(int messageCount) {
@@ -1904,9 +1897,7 @@ public class ShareConsumerTest {
 
     private void warmup() throws InterruptedException {
         createTopic(warmupTp.topic());
-        TestUtils.waitForCondition(() ->
-                
!cluster.brokers().get(0).metadataCache().getAliveBrokerNodes(new 
ListenerName("EXTERNAL")).isEmpty(),
-            DEFAULT_MAX_WAIT_MS, 100L, () -> "cache not up yet");
+        waitForMetadataCache();
         ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(warmupTp.topic(), warmupTp.partition(), null, 
"key".getBytes(), "value".getBytes());
         Set<String> subscription = Collections.singleton(warmupTp.topic());
         alterShareAutoOffsetReset("warmupgroup1", "earliest");
@@ -1922,6 +1913,40 @@ public class ShareConsumerTest {
         }
     }
 
+    private void waitForMetadataCache() throws InterruptedException {
+        TestUtils.waitForCondition(() ->
+                
!cluster.brokers().get(0).metadataCache().getAliveBrokerNodes(new 
ListenerName("EXTERNAL")).isEmpty(),
+            DEFAULT_MAX_WAIT_MS, 100L, () -> "cache not up yet");
+    }
+
+    private void verifyShareGroupStateTopicRecordsProduced() {
+        try {
+            Map<String, Object> consumerConfigs = new HashMap<>();
+            consumerConfigs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
+            consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+            
consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+
+            try (KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(consumerConfigs)) {
+                consumer.assign(sgsTopicPartitions);
+                consumer.seekToBeginning(sgsTopicPartitions);
+                Set<ConsumerRecord<byte[], byte[]>> records = new HashSet<>();
+                TestUtils.waitForCondition(() -> {
+                        ConsumerRecords<byte[], byte[]> msgs = 
consumer.poll(Duration.ofMillis(5000L));
+                        if (msgs.count() > 0) {
+                            
msgs.records(Topic.SHARE_GROUP_STATE_TOPIC_NAME).forEach(records::add);
+                        }
+                        return records.size() > 2; // +2 because of extra 
warmup records
+                    },
+                    30000L,
+                    200L,
+                    () -> "no records produced"
+                );
+            }
+        } catch (InterruptedException e) {
+            fail(e);
+        }
+    }
+
     private void alterShareAutoOffsetReset(String groupId, String newValue) {
         ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.GROUP, groupId);
         Map<ConfigResource, Collection<AlterConfigOp>> alterEntries = new 
HashMap<>();

Reply via email to