This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1da30bdedf4 KAFKA-18900: Experimental share consumer acknowledge mode 
config (#19113)
1da30bdedf4 is described below

commit 1da30bdedf457b507ed9879a78e31c9e4c74026c
Author: Andrew Schofield <aschofi...@confluent.io>
AuthorDate: Thu Mar 6 17:57:11 2025 +0000

    KAFKA-18900: Experimental share consumer acknowledge mode config (#19113)
    
    User testing of the `KafkaShareConsumer` interface has revealed some
    areas which confuse people. One of these is that way that it decides
    whether you want to use implicit or explicit acknowledgement of records
    by observing which calls the application issues. We are taking the
    opportunity to refine the interface before it is finalised.
    
    This PR introduces an experimental configuration called
    `internal.share.acknowledgement.mode` which can be used to make the
    application declare which kind of acknowledgement it wishes to use. We
    plan to try out the configuration, assess whether it has helped, and
    then create a proper consumer configuration that makes this area better.
    That would require a lot of change in the tests, which explains why this
    initial PR only has a small number of tests.
    
    Reviewers: David Arthur <mum...@gmail.com>
---
 .../kafka/clients/consumer/AcknowledgeType.java    |   4 +
 .../kafka/clients/consumer/ConsumerConfig.java     |  41 +++--
 .../kafka/clients/consumer/KafkaShareConsumer.java |   2 +
 .../kafka/clients/consumer/ShareConsumer.java      |   2 +
 .../clients/consumer/ShareConsumerConfig.java      |  15 +-
 .../consumer/internals/ShareConsumerImpl.java      |  24 ++-
 .../java/kafka/test/api/ShareConsumerTest.java     | 186 +++++++++++++--------
 7 files changed, 184 insertions(+), 90 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java
index 14b5415c2a4..b42bc135363 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/AcknowledgeType.java
@@ -20,6 +20,10 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.Locale;
 
+/**
+ * The acknowledge type is used with {@link 
KafkaShareConsumer#acknowledge(ConsumerRecord, AcknowledgeType)} to indicate
+ * whether the record was consumed successfully.
+ */
 @InterfaceStability.Evolving
 public enum AcknowledgeType {
     /** The record was consumed successfully. */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index d9cee3ec179..e5917d9d593 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -380,13 +380,28 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String SECURITY_PROVIDERS_CONFIG = 
SecurityConfig.SECURITY_PROVIDERS_CONFIG;
     private static final String SECURITY_PROVIDERS_DOC = 
SecurityConfig.SECURITY_PROVIDERS_DOC;
 
+    /**
+     * <code>share.acknowledgement.mode</code> is being evaluated as a new 
configuration to control the acknowledgement mode
+     * for share consumers. It will be removed or converted to a proper 
configuration before release.
+     * An alternative being considered is 
<code>enable.explicit.share.acknowledgement</code> as a boolean configuration.
+     */
+    public static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG = 
"internal.share.acknowledgement.mode";
+    private static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_DOC = 
"Controls the acknowledgement mode for a share consumer." +
+            " If unset, the acknowledgement mode of the consumer is decided by 
the method calls it uses to fetch and commit." +
+            " If set to <code>implicit</code>, the acknowledgement mode of the 
consumer is implicit and it must not" +
+            " use 
<code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to 
acknowledge delivery of records. Instead," +
+            " delivery is acknowledged implicitly on the next call to poll or 
commit." +
+            " If set to <code>explicit</code>, the acknowledgement mode of the 
consumer is explicit and it must use" +
+            " 
<code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to 
acknowledge delivery of records.";
+
     private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new 
AtomicInteger(1);
 
     /**
      * A list of configuration keys not supported for CLASSIC protocol.
      */
-    private static final List<String> CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS = 
Collections.singletonList(
-            GROUP_REMOTE_ASSIGNOR_CONFIG
+    private static final List<String> CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS = 
List.of(
+            GROUP_REMOTE_ASSIGNOR_CONFIG,
+            INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
     );
 
     /**
@@ -395,7 +410,8 @@ public class ConsumerConfig extends AbstractConfig {
     private static final List<String> CONSUMER_PROTOCOL_UNSUPPORTED_CONFIGS = 
List.of(
             PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
             HEARTBEAT_INTERVAL_MS_CONFIG, 
-            SESSION_TIMEOUT_MS_CONFIG
+            SESSION_TIMEOUT_MS_CONFIG,
+            INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
     );
     
     static {
@@ -678,8 +694,13 @@ public class ConsumerConfig extends AbstractConfig {
                                         
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS,
                                         atLeast(0),
                                         Importance.LOW,
-                                        
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC);
-
+                                        
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
+                                
.define(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
+                                        Type.STRING,
+                                        null,
+                                        in(null, "implicit", "explicit"),
+                                        Importance.MEDIUM,
+                                        
ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_DOC);
     }
 
     @Override
@@ -689,7 +710,7 @@ public class ConsumerConfig extends AbstractConfig {
         Map<String, Object> refinedConfigs = 
CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues);
         maybeOverrideClientId(refinedConfigs);
         maybeOverrideEnableAutoCommit(refinedConfigs);
-        checkUnsupportedConfigs();
+        checkUnsupportedConfigsPostProcess();
         return refinedConfigs;
     }
 
@@ -736,16 +757,16 @@ public class ConsumerConfig extends AbstractConfig {
         }
     }
 
-    private void checkUnsupportedConfigs() {
+    protected void checkUnsupportedConfigsPostProcess() {
         String groupProtocol = getString(GROUP_PROTOCOL_CONFIG);
         if (GroupProtocol.CLASSIC.name().equalsIgnoreCase(groupProtocol)) {
-            checkUnsupportedConfigs(GroupProtocol.CLASSIC, 
CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS);
+            checkUnsupportedConfigsPostProcess(GroupProtocol.CLASSIC, 
CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS);
         } else if 
(GroupProtocol.CONSUMER.name().equalsIgnoreCase(groupProtocol)) {
-            checkUnsupportedConfigs(GroupProtocol.CONSUMER, 
CONSUMER_PROTOCOL_UNSUPPORTED_CONFIGS);
+            checkUnsupportedConfigsPostProcess(GroupProtocol.CONSUMER, 
CONSUMER_PROTOCOL_UNSUPPORTED_CONFIGS);
         }
     }
 
-    private void checkUnsupportedConfigs(GroupProtocol groupProtocol, 
List<String> unsupportedConfigs) {
+    private void checkUnsupportedConfigsPostProcess(GroupProtocol 
groupProtocol, List<String> unsupportedConfigs) {
         if 
(getString(GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(groupProtocol.name())) {
             List<String> invalidConfigs = new ArrayList<>();
             unsupportedConfigs.forEach(configName -> {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
index a5c92394353..f8827ea2587 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
@@ -140,6 +140,8 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
  *     thrown by a failure to commit the acknowledgements.</li>
  *     <li>The application calls {@link #close()}  which releases any acquired 
records without acknowledgement.</li>
  * </ul>
+ * <p>The consumer can optionally use the {@code 
internal.share.acknowledgement.mode} configuration property to choose
+ * between implicit and explicit acknowledgement, specifying 
<code>"implicit"</code> or <code>"explicit"</code> as required.
  * <p>
  * The consumer guarantees that the records returned in the {@code 
ConsumerRecords} object for a specific topic-partition
  * are in order of increasing offset. For each topic-partition, Kafka 
guarantees that acknowledgements for the records
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java
index 2d926b2cad5..900c249d852 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumer.java
@@ -32,6 +32,8 @@ import java.util.Optional;
 import java.util.Set;
 
 /**
+ * A client that consumes records from a Kafka cluster using a share group.
+ *
  * @see KafkaShareConsumer
  * @see MockShareConsumer
  */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumerConfig.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumerConfig.java
index ce22d7b0a2e..84971abf1e1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ShareConsumerConfig.java
@@ -24,9 +24,9 @@ import java.util.Map;
 import java.util.Properties;
 
 /**
- * The share consumer configuration keys
+ * The consumer configuration behavior specific to share groups.
  */
-public class ShareConsumerConfig extends ConsumerConfig {
+class ShareConsumerConfig extends ConsumerConfig {
     /**
      * A list of configuration keys not supported for SHARE consumer.
      */
@@ -43,11 +43,11 @@ public class ShareConsumerConfig extends ConsumerConfig {
             ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG
     );
 
-    public ShareConsumerConfig(Properties props) {
+    ShareConsumerConfig(Properties props) {
         super(props);
     }
 
-    public ShareConsumerConfig(Map<String, Object> props) {
+    ShareConsumerConfig(Map<String, Object> props) {
         super(props);
     }
 
@@ -57,11 +57,11 @@ public class ShareConsumerConfig extends ConsumerConfig {
 
     @Override
     protected Map<String, Object> preProcessParsedConfig(final Map<String, 
Object> parsedValues) {
-        checkUnsupportedConfigs(parsedValues);
+        checkUnsupportedConfigsPreProcess(parsedValues);
         return parsedValues;
     }
 
-    private void checkUnsupportedConfigs(Map<String, Object> parsedValues) {
+    private void checkUnsupportedConfigsPreProcess(Map<String, Object> 
parsedValues) {
         List<String> invalidConfigs = new ArrayList<>();
         SHARE_GROUP_UNSUPPORTED_CONFIGS.forEach(configName -> {
             if (parsedValues.containsKey(configName)) {
@@ -74,4 +74,7 @@ public class ShareConsumerConfig extends ConsumerConfig {
         }
     }
 
+    @Override
+    protected void checkUnsupportedConfigsPostProcess() {
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 4dc8cf4c697..7763f308cb0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -187,7 +187,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         IMPLICIT
     }
 
-    private AcknowledgementMode acknowledgementMode = 
AcknowledgementMode.UNKNOWN;
+    private AcknowledgementMode acknowledgementMode;
 
     /**
      * A thread-safe {@link ShareFetchBuffer fetch buffer} for the results 
that are populated in the
@@ -258,6 +258,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             this.metrics = createMetrics(config, time, reporters);
             this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
 
+            this.acknowledgementMode = initializeAcknowledgementMode(config, 
log);
             this.deserializers = new Deserializers<>(config, keyDeserializer, 
valueDeserializer, metrics);
             this.currentFetch = ShareFetch.empty();
             this.subscriptions = createSubscriptionState(config, logContext);
@@ -369,6 +370,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         this.subscriptions = subscriptions;
         this.metadata = metadata;
         this.defaultApiTimeoutMs = 
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
+        this.acknowledgementMode = initializeAcknowledgementMode(config, log);
         this.fetchBuffer = new ShareFetchBuffer(logContext);
         this.completedAcknowledgements = new LinkedList<>();
 
@@ -463,6 +465,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         this.metrics = metrics;
         this.metadata = metadata;
         this.defaultApiTimeoutMs = defaultApiTimeoutMs;
+        this.acknowledgementMode = initializeAcknowledgementMode(null, log);
         this.deserializers = new Deserializers<>(keyDeserializer, 
valueDeserializer, metrics);
         this.currentFetch = ShareFetch.empty();
         this.applicationEventHandler = applicationEventHandler;
@@ -1062,6 +1065,25 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         }
     }
 
+    /**
+     * Initializes the acknowledgement mode based on the configuration.
+     */
+    private static AcknowledgementMode 
initializeAcknowledgementMode(ConsumerConfig config, Logger log) {
+        if (config == null) {
+            return AcknowledgementMode.UNKNOWN;
+        }
+        String acknowledgementModeStr = 
config.getString(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
+        if ((acknowledgementModeStr == null) || 
acknowledgementModeStr.isEmpty()) {
+            return AcknowledgementMode.UNKNOWN;
+        } else if (acknowledgementModeStr.equalsIgnoreCase("implicit")) {
+            return AcknowledgementMode.IMPLICIT;
+        } else if (acknowledgementModeStr.equalsIgnoreCase("explicit")) {
+            return AcknowledgementMode.EXPLICIT;
+        }
+        log.warn("Invalid value for config {}: \"{}\"", 
ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
acknowledgementModeStr);
+        return AcknowledgementMode.UNKNOWN;
+    }
+
     /**
      * Process the events—if any—that were produced by the {@link 
ConsumerNetworkThread network thread}.
      * It is possible that {@link ErrorEvent an error}
diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java 
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index 0ee38d8aba5..f485e4f816e 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -75,7 +75,6 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -160,7 +159,7 @@ public class ShareConsumerTest {
     public void testPollNoSubscribeFails() {
         setup();
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
-            assertEquals(Collections.emptySet(), shareConsumer.subscription());
+            assertEquals(Set.of(), shareConsumer.subscription());
             // "Consumer is not subscribed to any topics."
             assertThrows(IllegalStateException.class, () -> 
shareConsumer.poll(Duration.ofMillis(500)));
         }
@@ -171,7 +170,7 @@ public class ShareConsumerTest {
         setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
-            Set<String> subscription = Collections.singleton(tp.topic());
+            Set<String> subscription = Set.of(tp.topic());
             shareConsumer.subscribe(subscription);
             assertEquals(subscription, shareConsumer.subscription());
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
@@ -185,12 +184,12 @@ public class ShareConsumerTest {
         setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
-            Set<String> subscription = Collections.singleton(tp.topic());
+            Set<String> subscription = Set.of(tp.topic());
             shareConsumer.subscribe(subscription);
             assertEquals(subscription, shareConsumer.subscription());
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
             shareConsumer.unsubscribe();
-            assertEquals(Collections.emptySet(), shareConsumer.subscription());
+            assertEquals(Set.of(), shareConsumer.subscription());
             assertEquals(0, records.count());
             verifyShareGroupStateTopicRecordsProduced();
         }
@@ -201,7 +200,7 @@ public class ShareConsumerTest {
         setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
-            Set<String> subscription = Collections.singleton(tp.topic());
+            Set<String> subscription = Set.of(tp.topic());
             shareConsumer.subscribe(subscription);
             assertEquals(subscription, shareConsumer.subscription());
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
@@ -219,12 +218,12 @@ public class ShareConsumerTest {
         setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
-            Set<String> subscription = Collections.singleton(tp.topic());
+            Set<String> subscription = Set.of(tp.topic());
             shareConsumer.subscribe(subscription);
             assertEquals(subscription, shareConsumer.subscription());
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
             shareConsumer.unsubscribe();
-            assertEquals(Collections.emptySet(), shareConsumer.subscription());
+            assertEquals(Set.of(), shareConsumer.subscription());
             // "Consumer is not subscribed to any topics."
             assertThrows(IllegalStateException.class, () -> 
shareConsumer.poll(Duration.ofMillis(500)));
             assertEquals(0, records.count());
@@ -237,12 +236,12 @@ public class ShareConsumerTest {
         setup();
         alterShareAutoOffsetReset("group1", "earliest");
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
-            Set<String> subscription = Collections.singleton(tp.topic());
+            Set<String> subscription = Set.of(tp.topic());
             shareConsumer.subscribe(subscription);
             assertEquals(subscription, shareConsumer.subscription());
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(500));
-            shareConsumer.subscribe(Collections.emptySet());
-            assertEquals(Collections.emptySet(), shareConsumer.subscription());
+            shareConsumer.subscribe(Set.of());
+            assertEquals(Set.of(), shareConsumer.subscription());
             // "Consumer is not subscribed to any topics."
             assertThrows(IllegalStateException.class, () -> 
shareConsumer.poll(Duration.ofMillis(500)));
             assertEquals(0, records.count());
@@ -260,7 +259,7 @@ public class ShareConsumerTest {
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
             producer.flush();
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
             verifyShareGroupStateTopicRecordsProduced();
@@ -277,7 +276,7 @@ public class ShareConsumerTest {
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
             producer.flush();
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
             producer.send(record);
@@ -307,12 +306,12 @@ public class ShareConsumerTest {
             producer.flush();
             shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap, 
partitionExceptionMap));
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
 
             TestUtils.waitForCondition(() -> 
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
                 DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records 
for share consumer");
 
-            shareConsumer.subscribe(Collections.singletonList(tp2.topic()));
+            shareConsumer.subscribe(Set.of(tp2.topic()));
 
             // Waiting for heartbeat to propagate the subscription change.
             TestUtils.waitForCondition(() -> {
@@ -342,7 +341,7 @@ public class ShareConsumerTest {
             producer.flush();
 
             shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap, 
partitionExceptionMap));
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
 
             TestUtils.waitForCondition(() -> 
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
                 DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records 
for share consumer");
@@ -371,7 +370,7 @@ public class ShareConsumerTest {
             producer.send(record);
             producer.flush();
             shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap, 
partitionExceptionMap));
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
 
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
@@ -403,7 +402,7 @@ public class ShareConsumerTest {
             producer.flush();
 
             shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap, 
partitionExceptionMap));
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
 
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
@@ -457,7 +456,7 @@ public class ShareConsumerTest {
             producer.send(record);
             producer.flush();
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
 
             List<ConsumerRecord<byte[], byte[]>> records = 
consumeRecords(shareConsumer, numRecords);
             assertEquals(numRecords, records.size());
@@ -489,7 +488,7 @@ public class ShareConsumerTest {
             producer.send(record);
             producer.flush();
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
 
             List<ConsumerRecord<byte[], byte[]>> records = 
consumeRecords(shareConsumer, numRecords);
             assertEquals(numRecords, records.size());
@@ -511,12 +510,12 @@ public class ShareConsumerTest {
 
         alterShareAutoOffsetReset("group1", "earliest");
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1",
-            Collections.singletonMap(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
String.valueOf(maxPollRecords)))) {
+            Map.of(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
String.valueOf(maxPollRecords)))) {
 
             long startingTimestamp = System.currentTimeMillis();
             produceMessagesWithTimestamp(numRecords, startingTimestamp);
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
 
             List<ConsumerRecord<byte[], byte[]>> records = 
consumeRecords(shareConsumer, numRecords);
             long i = 0L;
@@ -564,7 +563,7 @@ public class ShareConsumerTest {
             transactionalProducer.close();
             nonTransactionalProducer.close();
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
 
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(4, records.count());
@@ -594,7 +593,7 @@ public class ShareConsumerTest {
             producer.send(record);
             producer.flush();
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
             records.forEach(shareConsumer::acknowledge);
@@ -616,7 +615,7 @@ public class ShareConsumerTest {
             producer.send(record);
             producer.flush();
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
             records.forEach(shareConsumer::acknowledge);
@@ -645,8 +644,8 @@ public class ShareConsumerTest {
             producer.send(record3);
             producer.flush();
 
-            shareConsumer1.subscribe(Collections.singleton(tp.topic()));
-            shareConsumer2.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer1.subscribe(Set.of(tp.topic()));
+            shareConsumer2.subscribe(Set.of(tp.topic()));
 
             Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new 
HashMap<>();
             Map<TopicPartition, Exception> partitionExceptionMap1 = new 
HashMap<>();
@@ -693,7 +692,7 @@ public class ShareConsumerTest {
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
 
             Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new 
HashMap<>();
             Map<TopicPartition, Exception> partitionExceptionMap1 = new 
HashMap<>();
@@ -739,7 +738,7 @@ public class ShareConsumerTest {
             producer.send(record3);
             producer.flush();
 
-            shareConsumer1.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer1.subscribe(Set.of(tp.topic()));
 
             Map<TopicPartition, Set<Long>> partitionOffsetsMap = new 
HashMap<>();
             Map<TopicPartition, Exception> partitionExceptionMap = new 
HashMap<>();
@@ -799,7 +798,7 @@ public class ShareConsumerTest {
             producer.send(record);
             producer.flush();
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
             records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
@@ -822,7 +821,7 @@ public class ShareConsumerTest {
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
             producer.flush();
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
             records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
@@ -844,7 +843,7 @@ public class ShareConsumerTest {
             producer.send(record);
             producer.flush();
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
             records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
@@ -863,7 +862,7 @@ public class ShareConsumerTest {
             producer.send(record);
             producer.flush();
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
             ConsumerRecord<byte[], byte[]> consumedRecord = 
records.records(tp).get(0);
@@ -886,7 +885,7 @@ public class ShareConsumerTest {
             producer.send(record);
             producer.flush();
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
             ConsumerRecord<byte[], byte[]> consumedRecord = 
records.records(tp).get(0);
@@ -908,7 +907,7 @@ public class ShareConsumerTest {
             producer.send(record);
             producer.flush();
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
             Map<TopicIdPartition, Optional<KafkaException>> result = 
shareConsumer.commitSync();
@@ -936,7 +935,7 @@ public class ShareConsumerTest {
             producer.send(record3);
             producer.flush();
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
 
             Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new 
HashMap<>();
             Map<TopicPartition, Exception> partitionExceptionMap1 = new 
HashMap<>();
@@ -961,6 +960,52 @@ public class ShareConsumerTest {
         }
     }
 
+    @ClusterTest
+    public void testConfiguredExplicitAcknowledgeCommitSuccess() {
+        setup();
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+            ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                "group1",
+                
Map.of(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"))) 
{
+
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            producer.send(record);
+            producer.flush();
+
+            shareConsumer.subscribe(Set.of(tp.topic()));
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+            assertEquals(1, records.count());
+            records.forEach(shareConsumer::acknowledge);
+            producer.send(record);
+            Map<TopicIdPartition, Optional<KafkaException>> result = 
shareConsumer.commitSync();
+            assertEquals(1, result.size());
+            records = shareConsumer.poll(Duration.ofMillis(5000));
+            assertEquals(1, records.count());
+            verifyShareGroupStateTopicRecordsProduced();
+        }
+    }
+
+    @ClusterTest
+    public void testConfiguredImplicitAcknowledgeExplicitAcknowledgeFails() {
+        setup();
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                 "group1",
+                 
Map.of(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "implicit"))) 
{
+
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            producer.send(record);
+            producer.flush();
+
+            shareConsumer.subscribe(Set.of(tp.topic()));
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+            assertEquals(1, records.count());
+            assertThrows(IllegalStateException.class, () -> 
shareConsumer.acknowledge(records.iterator().next()));
+        }
+    }
+
     @ClusterTest
     public void testFetchRecordLargerThanMaxPartitionFetchBytes() throws 
Exception {
         setup();
@@ -971,16 +1016,14 @@ public class ShareConsumerTest {
             Producer<byte[], byte[]> producer = createProducer();
             ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
                 "group1",
-                
Collections.singletonMap(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
String.valueOf(maxPartitionFetchBytes))
-            )
-        ) {
+                Map.of(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
String.valueOf(maxPartitionFetchBytes)))) {
 
             ProducerRecord<byte[], byte[]> smallRecord = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             ProducerRecord<byte[], byte[]> bigRecord = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), new 
byte[maxPartitionFetchBytes]);
             producer.send(smallRecord).get();
             producer.send(bigRecord).get();
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
 
             ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
             assertEquals(2, records.count());
@@ -999,9 +1042,8 @@ public class ShareConsumerTest {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
 
-            shareConsumer1.subscribe(Collections.singleton(tp.topic()));
-
-            shareConsumer2.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer1.subscribe(Set.of(tp.topic()));
+            shareConsumer2.subscribe(Set.of(tp.topic()));
 
             // producing 3 records to the topic
             producer.send(record);
@@ -1049,8 +1091,8 @@ public class ShareConsumerTest {
              ShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
-            shareConsumer1.subscribe(Collections.singleton(tp.topic()));
-            shareConsumer2.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer1.subscribe(Set.of(tp.topic()));
+            shareConsumer2.subscribe(Set.of(tp.topic()));
 
             int totalMessages = 2000;
             for (int i = 0; i < totalMessages; i++) {
@@ -1185,8 +1227,8 @@ public class ShareConsumerTest {
              ShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
-            shareConsumer1.subscribe(Collections.singleton(tp.topic()));
-            shareConsumer2.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer1.subscribe(Set.of(tp.topic()));
+            shareConsumer2.subscribe(Set.of(tp.topic()));
 
             int totalMessages = 1500;
             for (int i = 0; i < totalMessages; i++) {
@@ -1347,7 +1389,7 @@ public class ShareConsumerTest {
             producer.flush();
 
             shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallbackWithShareConsumer<>(shareConsumer));
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
 
             // The acknowledgment commit callback will try to call a method of 
ShareConsumer
             shareConsumer.poll(Duration.ofMillis(5000));
@@ -1370,7 +1412,7 @@ public class ShareConsumerTest {
         public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, 
Exception exception) {
             // Accessing methods of ShareConsumer should throw an exception.
             assertThrows(IllegalStateException.class, shareConsumer::close);
-            assertThrows(IllegalStateException.class, () -> 
shareConsumer.subscribe(Collections.singleton(tp.topic())));
+            assertThrows(IllegalStateException.class, () -> 
shareConsumer.subscribe(Set.of(tp.topic())));
             assertThrows(IllegalStateException.class, () -> 
shareConsumer.poll(Duration.ofMillis(5000)));
         }
     }
@@ -1392,7 +1434,7 @@ public class ShareConsumerTest {
 
             // The acknowledgment commit callback will try to call a method of 
ShareConsumer
             shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallbackWakeup<>(shareConsumer));
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
 
             TestUtils.waitForCondition(() -> 
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
                 DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records 
for share consumer");
@@ -1444,7 +1486,7 @@ public class ShareConsumerTest {
             producer.flush();
 
             shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallbackThrows<>());
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
 
             TestUtils.waitForCondition(() -> 
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
                 DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records 
for share consumer");
@@ -1479,7 +1521,7 @@ public class ShareConsumerTest {
         alterShareAutoOffsetReset("group1", "earliest");
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
 
             // interrupt the thread and call poll
             try {
@@ -1504,7 +1546,7 @@ public class ShareConsumerTest {
         alterShareAutoOffsetReset("group1", "earliest");
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
-            shareConsumer.subscribe(Collections.singleton("topic abc"));
+            shareConsumer.subscribe(Set.of("topic abc"));
 
             // The exception depends upon a metadata response which arrives 
asynchronously. If the delay is
             // too short, the poll might return before the error is known.
@@ -1527,7 +1569,7 @@ public class ShareConsumerTest {
             producer.send(record);
             producer.flush();
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
 
             shareConsumer.wakeup();
             assertThrows(WakeupException.class, () -> 
shareConsumer.poll(Duration.ZERO));
@@ -1546,7 +1588,7 @@ public class ShareConsumerTest {
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
 
             String topic = "foo";
-            shareConsumer.subscribe(Collections.singleton(topic));
+            shareConsumer.subscribe(Set.of(topic));
 
             // Topic is created post creation of share consumer and 
subscription
             createTopic(topic);
@@ -1630,7 +1672,7 @@ public class ShareConsumerTest {
             }
 
             // We delete records before offset 5, so the LSO should move to 5.
-            adminClient.deleteRecords(Collections.singletonMap(tp, 
RecordsToDelete.beforeOffset(5L)));
+            adminClient.deleteRecords(Map.of(tp, 
RecordsToDelete.beforeOffset(5L)));
 
             int messageCount = consumeMessages(new AtomicInteger(0), 5, 
groupId, 1, 10, true);
             // The records returned belong to offsets 5-9.
@@ -1642,14 +1684,14 @@ public class ShareConsumerTest {
             }
 
             // We delete records before offset 14, so the LSO should move to 
14.
-            adminClient.deleteRecords(Collections.singletonMap(tp, 
RecordsToDelete.beforeOffset(14L)));
+            adminClient.deleteRecords(Map.of(tp, 
RecordsToDelete.beforeOffset(14L)));
 
             int consumeMessagesCount = consumeMessages(new AtomicInteger(0), 
1, groupId, 1, 10, true);
             // The record returned belong to offset 14.
             assertEquals(1, consumeMessagesCount);
 
             // We delete records before offset 15, so the LSO should move to 
15 and now no records should be returned.
-            adminClient.deleteRecords(Collections.singletonMap(tp, 
RecordsToDelete.beforeOffset(15L)));
+            adminClient.deleteRecords(Map.of(tp, 
RecordsToDelete.beforeOffset(15L)));
 
             messageCount = consumeMessages(new AtomicInteger(0), 0, groupId, 
1, 5, true);
             assertEquals(0, messageCount);
@@ -1663,7 +1705,7 @@ public class ShareConsumerTest {
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1");
              Producer<byte[], byte[]> producer = createProducer()) {
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             // Producing a record.
             producer.send(record);
@@ -1691,7 +1733,7 @@ public class ShareConsumerTest {
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1");
              Producer<byte[], byte[]> producer = createProducer()) {
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             // Producing a record.
             producer.send(record);
@@ -1719,8 +1761,7 @@ public class ShareConsumerTest {
             Producer<byte[], byte[]> producer = createProducer();
             Admin adminClient = createAdminClient()
         ) {
-
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             // We write 10 records to the topic, so they would be written from 
offsets 0-9 on the topic.
@@ -1729,7 +1770,7 @@ public class ShareConsumerTest {
             }
 
             // We delete records before offset 5, so the LSO should move to 5.
-            adminClient.deleteRecords(Collections.singletonMap(tp, 
RecordsToDelete.beforeOffset(5L)));
+            adminClient.deleteRecords(Map.of(tp, 
RecordsToDelete.beforeOffset(5L)));
 
             int consumedMessageCount = consumeMessages(new AtomicInteger(0), 
5, "group1", 1, 10, true);
             // The records returned belong to offsets 5-9.
@@ -1747,9 +1788,8 @@ public class ShareConsumerTest {
              ShareConsumer<byte[], byte[]> shareConsumerLatest = 
createShareConsumer("group2");
              Producer<byte[], byte[]> producer = createProducer()) {
 
-            shareConsumerEarliest.subscribe(Collections.singleton(tp.topic()));
-
-            shareConsumerLatest.subscribe(Collections.singleton(tp.topic()));
+            shareConsumerEarliest.subscribe(Set.of(tp.topic()));
+            shareConsumerLatest.subscribe(Set.of(tp.topic()));
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             // Producing a record.
@@ -1805,7 +1845,7 @@ public class ShareConsumerTest {
             producer.send(currentRecord).get();
             producer.flush();
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
             
             // Should only receive messages from last hour (recent and current)
             List<ConsumerRecord<byte[], byte[]>> records = 
consumeRecords(shareConsumer, 2);
@@ -1824,7 +1864,7 @@ public class ShareConsumerTest {
         try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group2");
              Producer<byte[], byte[]> producer = createProducer()) {
 
-            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+            shareConsumer.subscribe(Set.of(tp.topic()));
             List<ConsumerRecord<byte[], byte[]>> records = 
consumeRecords(shareConsumer, 3);
             assertEquals(3, records.size());
             verifyShareGroupStateTopicRecordsProduced();
@@ -2123,7 +2163,7 @@ public class ShareConsumerTest {
         return assertDoesNotThrow(() -> {
             try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(
                     groupId)) {
-                shareConsumer.subscribe(Collections.singleton(tp.topic()));
+                shareConsumer.subscribe(Set.of(tp.topic()));
                 return consumeMessages(shareConsumer, totalMessagesConsumed, 
totalMessages, consumerNumber, maxPolls, commit);
             }
         }, "Consumer " + consumerNumber + " failed with exception");
@@ -2140,7 +2180,7 @@ public class ShareConsumerTest {
             try (ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(
                     groupId,
                     Map.of(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
maxFetchBytes))) {
-                shareConsumer.subscribe(Collections.singleton(tp.topic()));
+                shareConsumer.subscribe(Set.of(tp.topic()));
                 return consumeMessages(shareConsumer, totalMessagesConsumed, 
totalMessages, consumerNumber, maxPolls, commit);
             }
         }, "Consumer " + consumerNumber + " failed with exception");
@@ -2200,7 +2240,7 @@ public class ShareConsumerTest {
         AtomicReference<Uuid> topicId = new AtomicReference<>(null);
         assertDoesNotThrow(() -> {
             try (Admin admin = createAdminClient()) {
-                CreateTopicsResult result = 
admin.createTopics(Collections.singleton(new NewTopic(topicName, numPartitions, 
(short) replicationFactor)));
+                CreateTopicsResult result = admin.createTopics(Set.of(new 
NewTopic(topicName, numPartitions, (short) replicationFactor)));
                 result.all().get();
                 topicId.set(result.topicId(topicName).get());
             }
@@ -2212,7 +2252,7 @@ public class ShareConsumerTest {
     private void deleteTopic(String topicName) {
         assertDoesNotThrow(() -> {
             try (Admin admin = createAdminClient()) {
-                
admin.deleteTopics(Collections.singleton(topicName)).all().get();
+                admin.deleteTopics(Set.of(topicName)).all().get();
             }
         }, "Failed to delete topic");
     }
@@ -2257,7 +2297,7 @@ public class ShareConsumerTest {
         createTopic(warmupTp.topic());
         waitForMetadataCache();
         ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(warmupTp.topic(), warmupTp.partition(), null, 
"key".getBytes(), "value".getBytes());
-        Set<String> subscription = Collections.singleton(warmupTp.topic());
+        Set<String> subscription = Set.of(warmupTp.topic());
         alterShareAutoOffsetReset("warmupgroup1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("warmupgroup1")) {

Reply via email to