This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f737ef31d9b KAFKA-18900: Implement share.acknowledgement.mode to 
choose acknowledgement mode (#19417)
f737ef31d9b is described below

commit f737ef31d9bd5e8bbce11ffd7b9afd2e424cce3d
Author: Shivsundar R <s...@confluent.io>
AuthorDate: Tue Apr 15 11:38:33 2025 -0400

    KAFKA-18900: Implement share.acknowledgement.mode to choose acknowledgement 
mode (#19417)
    
    Choose the acknowledgement mode based on the config
    (`share.acknowledgement.mode`) and not on the basis of how the user
    designs the application.
    - The default value of the config is `IMPLICIT`, so if any
    empty/null/invalid value is configured, then the mode defaults to
    `IMPLICIT`.
    - Removed AcknowledgementModes `UNKNOWN` and `PENDING` as they are no
    longer required.
    - Added code to ensure if the application has any unacknowledged records
    in a batch in "`explicit`" mode, then it will throw an
    `IllegalStateException`. The expectation is if the mode is "explicit",
    all the records received in that `poll()` would be acknowledged before
    the next call to `poll()`.
    - Modified the `ConsoleShareConsumer` to configure the mode to
    "explicit" as it was using the explicit mode of acknowledging records.
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>
---
 .../kafka/clients/consumer/ShareConsumerTest.java  | 116 +++++++++------------
 .../kafka/clients/consumer/ConsumerConfig.java     |  22 ++--
 .../kafka/clients/consumer/KafkaShareConsumer.java |  84 +++++----------
 .../internals/ShareAcknowledgementMode.java        | 116 +++++++++++++++++++++
 .../consumer/internals/ShareConsumerImpl.java      |  78 ++++----------
 .../internals/ShareAcknowledgementModeTest.java    |  67 ++++++++++++
 .../consumer/internals/ShareConsumerImplTest.java  |  89 +++++++++++++++-
 .../kafka/tools/consumer/ConsoleShareConsumer.java |   8 +-
 8 files changed, 376 insertions(+), 204 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 8d9e4dbd610..e402a4344c1 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -135,6 +135,8 @@ public class ShareConsumerTest {
     private List<TopicPartition> sgsTopicPartitions;
     private static final String KEY = "content-type";
     private static final String VALUE = "application/octet-stream";
+    private static final String EXPLICIT = "explicit";
+    private static final String IMPLICIT = "implicit";
 
     public ShareConsumerTest(ClusterInstance cluster) {
         this.cluster = cluster;
@@ -594,7 +596,7 @@ public class ShareConsumerTest {
     public void testExplicitAcknowledgeSuccess() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
-             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1", 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -615,7 +617,7 @@ public class ShareConsumerTest {
     public void testExplicitAcknowledgeCommitSuccess() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
-             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1", 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -638,7 +640,7 @@ public class ShareConsumerTest {
     public void testExplicitAcknowledgementCommitAsync() throws 
InterruptedException {
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
-             ShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer("group1");
+             ShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer("group1", 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT));
              ShareConsumer<byte[], byte[]> shareConsumer2 = 
createShareConsumer("group1")) {
 
             ProducerRecord<byte[], byte[]> record1 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
@@ -663,15 +665,16 @@ public class ShareConsumerTest {
             // Acknowledging 2 out of the 3 records received via commitAsync.
             ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
             ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
+            ConsumerRecord<byte[], byte[]> thirdRecord = iterator.next();
             assertEquals(0L, firstRecord.offset());
             assertEquals(1L, secondRecord.offset());
 
             shareConsumer1.acknowledge(firstRecord);
             shareConsumer1.acknowledge(secondRecord);
+            shareConsumer1.acknowledge(thirdRecord, AcknowledgeType.RELEASE);
             shareConsumer1.commitAsync();
 
-            // The 3rd record should be reassigned to 2nd consumer when it 
polls, kept higher wait time
-            // as time out for locks is 15 secs.
+            // The 3rd record should be reassigned to 2nd consumer when it 
polls.
             TestUtils.waitForCondition(() -> {
                 ConsumerRecords<byte[], byte[]> records2 = 
shareConsumer2.poll(Duration.ofMillis(1000));
                 return records2.count() == 1 && 
records2.iterator().next().offset() == 2L;
@@ -690,51 +693,16 @@ public class ShareConsumerTest {
         }
     }
 
-    @ClusterTest
-    public void testImplicitModeNotTriggeredByPollWhenNoAcksToSend() throws 
InterruptedException {
-        alterShareAutoOffsetReset("group1", "earliest");
-        try (Producer<byte[], byte[]> producer = createProducer();
-             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
-
-            shareConsumer.subscribe(Set.of(tp.topic()));
-
-            Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new 
HashMap<>();
-            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap1, Map.of()));
-
-            // The acknowledgement mode moves to PENDING from UNKNOWN.
-            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
-            assertEquals(0, records.count());
-            shareConsumer.commitAsync();
-
-            ProducerRecord<byte[], byte[]> record1 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
-            producer.send(record1);
-            producer.flush();
-
-            // The acknowledgement mode remains in PENDING because no records 
were returned.
-            records = shareConsumer.poll(Duration.ofMillis(5000));
-            assertEquals(1, records.count());
-
-            // The acknowledgement mode now moves to EXPLICIT.
-            shareConsumer.acknowledge(records.iterator().next());
-            shareConsumer.commitAsync();
-
-            TestUtils.waitForCondition(() -> {
-                shareConsumer.poll(Duration.ofMillis(500));
-                return partitionOffsetsMap1.containsKey(tp);
-            }, 30000, 100L, () -> "Didn't receive call to callback");
-            verifyShareGroupStateTopicRecordsProduced();
-        }
-    }
-
     @ClusterTest
     public void testExplicitAcknowledgementCommitAsyncPartialBatch() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
-             ShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer("group1")) {
+             ShareConsumer<byte[], byte[]> shareConsumer1 = 
createShareConsumer("group1", 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
 
             ProducerRecord<byte[], byte[]> record1 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             ProducerRecord<byte[], byte[]> record2 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             ProducerRecord<byte[], byte[]> record3 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            ProducerRecord<byte[], byte[]> record4 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record1);
             producer.send(record2);
             producer.send(record3);
@@ -753,6 +721,7 @@ public class ShareConsumerTest {
             // Acknowledging 2 out of the 3 records received via commitAsync.
             ConsumerRecord<byte[], byte[]> firstRecord = iterator.next();
             ConsumerRecord<byte[], byte[]> secondRecord = iterator.next();
+            ConsumerRecord<byte[], byte[]> thirdRecord = iterator.next();
             assertEquals(0L, firstRecord.offset());
             assertEquals(1L, secondRecord.offset());
 
@@ -760,21 +729,25 @@ public class ShareConsumerTest {
             shareConsumer1.acknowledge(secondRecord);
             shareConsumer1.commitAsync();
 
-            // The 3rd record should be re-presented to the consumer when it 
polls again.
-            records = shareConsumer1.poll(Duration.ofMillis(5000));
-            assertEquals(1, records.count());
-            iterator = records.iterator();
-            firstRecord = iterator.next();
-            assertEquals(2L, firstRecord.offset());
+            producer.send(record4);
+            producer.flush();
+
+            // The next poll() should throw an IllegalStateException as there 
is still 1 unacknowledged record.
+            // In EXPLICIT acknowledgement mode, we are not allowed to have 
unacknowledged records from a batch.
+            assertThrows(IllegalStateException.class, () -> 
shareConsumer1.poll(Duration.ofMillis(5000)));
+
+            // Acknowledging the 3rd record
+            shareConsumer1.acknowledge(thirdRecord);
+            shareConsumer1.commitAsync();
 
-            // And poll again without acknowledging - the callback will 
receive the acknowledgement responses too
+            // The next poll() will not throw an exception, it would continue 
to fetch more records.
             records = shareConsumer1.poll(Duration.ofMillis(5000));
             assertEquals(1, records.count());
             iterator = records.iterator();
-            firstRecord = iterator.next();
-            assertEquals(2L, firstRecord.offset());
+            ConsumerRecord<byte[], byte[]> fourthRecord = iterator.next();
+            assertEquals(3L, fourthRecord.offset());
 
-            shareConsumer1.acknowledge(firstRecord);
+            shareConsumer1.acknowledge(fourthRecord);
 
             // The callback will receive the acknowledgement responses after 
polling. The callback is
             // called on entry to the poll method or during close. The commit 
is being performed asynchronously, so
@@ -784,6 +757,7 @@ public class ShareConsumerTest {
             shareConsumer1.close();
 
             assertFalse(partitionExceptionMap.containsKey(tp));
+            assertTrue(partitionOffsetsMap.containsKey(tp) && 
partitionOffsetsMap.get(tp).size() == 4);
             verifyShareGroupStateTopicRecordsProduced();
         }
     }
@@ -792,7 +766,7 @@ public class ShareConsumerTest {
     public void testExplicitAcknowledgeReleasePollAccept() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
-             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1", 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -815,7 +789,7 @@ public class ShareConsumerTest {
     public void testExplicitAcknowledgeReleaseAccept() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
-             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1", 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -835,7 +809,7 @@ public class ShareConsumerTest {
     public void testExplicitAcknowledgeReleaseClose() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
-             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1", 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -853,7 +827,7 @@ public class ShareConsumerTest {
     public void testExplicitAcknowledgeThrowsNotInBatch() {
         alterShareAutoOffsetReset("group1", "earliest");
         try (Producer<byte[], byte[]> producer = createProducer();
-             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1", 
Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -970,7 +944,7 @@ public class ShareConsumerTest {
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
                  "group1",
-                 
Map.of(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"))) 
{
+                 Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
"explicit"))) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -995,7 +969,7 @@ public class ShareConsumerTest {
         try (Producer<byte[], byte[]> producer = createProducer();
              ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
                  "group1",
-                 
Map.of(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "implicit"))) 
{
+                 Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
IMPLICIT))) {
 
             ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
             producer.send(record);
@@ -2069,7 +2043,7 @@ public class ShareConsumerTest {
             cluster.bootstrapServers(),
             topicName,
             groupId,
-            Map.of()
+            Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT)
         );
 
         service.schedule(
@@ -2145,7 +2119,8 @@ public class ShareConsumerTest {
         alterShareAutoOffsetReset("group1", "earliest");
         alterShareIsolationLevel("group1", "read_uncommitted");
         try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
-             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1", Map.of(
+                 ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) 
{
             shareConsumer.subscribe(Set.of(tp.topic()));
             transactionalProducer.initTransactions();
             try {
@@ -2231,7 +2206,8 @@ public class ShareConsumerTest {
         alterShareAutoOffsetReset("group1", "earliest");
         alterShareIsolationLevel("group1", "read_committed");
         try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
-             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1", Map.of(
+                 ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) 
{
             shareConsumer.subscribe(Set.of(tp.topic()));
             transactionalProducer.initTransactions();
 
@@ -2282,7 +2258,11 @@ public class ShareConsumerTest {
 
                 // Wait for the aborted marker offset for Message 4 (7L) to be 
fetched and acknowledged by the consumer.
                 TestUtils.waitForCondition(() -> {
-                    shareConsumer.poll(Duration.ofMillis(500));
+                    ConsumerRecords<byte[], byte[]> pollRecords = 
shareConsumer.poll(Duration.ofMillis(500));
+                    if (pollRecords.count() > 0) {
+                        // We will release Message 3 again if it was received 
in this poll().
+                        pollRecords.forEach(consumerRecord -> 
shareConsumer.acknowledge(consumerRecord, AcknowledgeType.RELEASE));
+                    }
                     return partitionOffsetsMap2.containsKey(tp) && 
partitionOffsetsMap2.get(tp).contains(7L);
                 }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume abort 
transaction marker offset for Message 4");
 
@@ -2299,7 +2279,7 @@ public class ShareConsumerTest {
                 produceCommittedTransaction(transactionalProducer, "Message 
8");
 
                 // Since isolation level is READ_UNCOMMITTED, we can consume 
Message 3 (committed transaction that was released), Message 5, Message 6, 
Message 7 and Message 8.
-                List<String> finalMessages = new ArrayList<>();
+                Set<String> finalMessages = new HashSet<>();
                 TestUtils.waitForCondition(() -> {
                     ConsumerRecords<byte[], byte[]> pollRecords = 
shareConsumer.poll(Duration.ofMillis(5000));
                     if (pollRecords.count() > 0) {
@@ -2311,11 +2291,8 @@ public class ShareConsumerTest {
                     return finalMessages.size() == 5;
                 }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume all 
records post altering share isolation level");
 
-                assertEquals("Message 3", finalMessages.get(0));
-                assertEquals("Message 5", finalMessages.get(1));
-                assertEquals("Message 6", finalMessages.get(2));
-                assertEquals("Message 7", finalMessages.get(3));
-                assertEquals("Message 8", finalMessages.get(4));
+                Set<String> expected = Set.of("Message 3", "Message 5", 
"Message 6", "Message 7", "Message 8");
+                assertEquals(expected, finalMessages);
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             } finally {
@@ -2330,7 +2307,8 @@ public class ShareConsumerTest {
         alterShareAutoOffsetReset("group1", "earliest");
         alterShareIsolationLevel("group1", "read_committed");
         try (Producer<byte[], byte[]> transactionalProducer = 
createProducer("T1");
-             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1", Map.of(
+                 ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, EXPLICIT))) 
{
             shareConsumer.subscribe(Set.of(tp.topic()));
             transactionalProducer.initTransactions();
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index e5917d9d593..68fe89a4147 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
@@ -381,13 +382,10 @@ public class ConsumerConfig extends AbstractConfig {
     private static final String SECURITY_PROVIDERS_DOC = 
SecurityConfig.SECURITY_PROVIDERS_DOC;
 
     /**
-     * <code>share.acknowledgement.mode</code> is being evaluated as a new 
configuration to control the acknowledgement mode
-     * for share consumers. It will be removed or converted to a proper 
configuration before release.
-     * An alternative being considered is 
<code>enable.explicit.share.acknowledgement</code> as a boolean configuration.
+     * <code>share.acknowledgement.mode</code>
      */
-    public static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG = 
"internal.share.acknowledgement.mode";
-    private static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_DOC = 
"Controls the acknowledgement mode for a share consumer." +
-            " If unset, the acknowledgement mode of the consumer is decided by 
the method calls it uses to fetch and commit." +
+    public static final String SHARE_ACKNOWLEDGEMENT_MODE_CONFIG = 
"share.acknowledgement.mode";
+    private static final String SHARE_ACKNOWLEDGEMENT_MODE_DOC = "Controls the 
acknowledgement mode for a share consumer." +
             " If set to <code>implicit</code>, the acknowledgement mode of the 
consumer is implicit and it must not" +
             " use 
<code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to 
acknowledge delivery of records. Instead," +
             " delivery is acknowledged implicitly on the next call to poll or 
commit." +
@@ -401,7 +399,7 @@ public class ConsumerConfig extends AbstractConfig {
      */
     private static final List<String> CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS = 
List.of(
             GROUP_REMOTE_ASSIGNOR_CONFIG,
-            INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
+            SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
     );
 
     /**
@@ -411,7 +409,7 @@ public class ConsumerConfig extends AbstractConfig {
             PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
             HEARTBEAT_INTERVAL_MS_CONFIG, 
             SESSION_TIMEOUT_MS_CONFIG,
-            INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
+            SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
     );
     
     static {
@@ -695,12 +693,12 @@ public class ConsumerConfig extends AbstractConfig {
                                         atLeast(0),
                                         Importance.LOW,
                                         
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
-                                
.define(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
+                                
.define(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
                                         Type.STRING,
-                                        null,
-                                        in(null, "implicit", "explicit"),
+                                        
ShareAcknowledgementMode.IMPLICIT.name(),
+                                        new 
ShareAcknowledgementMode.Validator(),
                                         Importance.MEDIUM,
-                                        
ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_DOC);
+                                        
ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_DOC);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
index d694984db57..b5a862f239d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java
@@ -116,33 +116,33 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
  * {@code group.share.record.lock.partition.limit}. By limiting the duration 
of the acquisition lock and automatically
  * releasing the locks, the broker ensures delivery progresses even in the 
presence of consumer failures.
  * <p>
- * The consumer can choose to use implicit or explicit acknowledgement of the 
records it processes.
- * <p>If the application calls {@link #acknowledge(ConsumerRecord, 
AcknowledgeType)} for any record in the batch,
- * it is using <em>explicit acknowledgement</em>. In this case:
- * <ul>
- *     <li>The application calls {@link #commitSync()} or {@link 
#commitAsync()} which commits the acknowledgements to Kafka.
- *     If any records in the batch were not acknowledged, they remain acquired 
and will be presented to the application
- *     in response to a future poll.</li>
- *     <li>The application calls {@link #poll(Duration)} without committing 
first, which commits the acknowledgements to
- *     Kafka asynchronously. In this case, no exception is thrown by a failure 
to commit the acknowledgement.
- *     If any records in the batch were not acknowledged, they remain acquired 
and will be presented to the application
- *     in response to a future poll.</li>
- *     <li>The application calls {@link #close()} which attempts to commit any 
pending acknowledgements and
- *     releases any remaining acquired records.</li>
- * </ul>
- * If the application does not call {@link #acknowledge(ConsumerRecord, 
AcknowledgeType)} for any record in the batch,
- * it is using <em>implicit acknowledgement</em>. In this case:
+ * The consumer can choose to use implicit or explicit acknowledgement of the 
records it processes by configuring the
+ * consumer {@code share.acknowledgement.mode} property.
+ * <p>
+ * If the application sets the property to "implicit" or does not set it at 
all, then the consumer is using
+ * <em>implicit acknowledgement</em>. In this mode, the application 
acknowledges delivery by:
  * <ul>
- *     <li>The application calls {@link #commitSync()} or {@link 
#commitAsync()} which implicitly acknowledges all of
- *     the delivered records as processed successfully and commits the 
acknowledgements to Kafka.</li>
- *     <li>The application calls {@link #poll(Duration)} without committing, 
which also implicitly acknowledges all of
+ *     <li>Calling {@link #poll(Duration)} without committing, which also 
implicitly acknowledges all
  *     the delivered records and commits the acknowledgements to Kafka 
asynchronously. In this case, no exception is
  *     thrown by a failure to commit the acknowledgements.</li>
- *     <li>The application calls {@link #close()}  which releases any acquired 
records without acknowledgement.</li>
+ *     <li>Calling {@link #commitSync()} or {@link #commitAsync()} which 
implicitly acknowledges all
+ *     the delivered records as processed successfully and commits the 
acknowledgements to Kafka.</li>
+ *     <li>Calling {@link #close()} which releases any acquired records 
without acknowledgement.</li>
+ * </ul>
+ * If the application sets the property to "explicit", then the consumer is 
using <em>explicit acknowledgment</em>.
+ * The application must acknowledge all records returned from {@link 
#poll(Duration)} using
+ * {@link #acknowledge(ConsumerRecord, AcknowledgeType)} before its next call 
to {@link #poll(Duration)}.
+ * If the application calls {@link #poll(Duration)} without having 
acknowledged all records, an
+ * {@link IllegalStateException} is thrown. The remaining unacknowledged 
records can still be acknowledged.
+ * In this mode, the application acknowledges delivery by:
+ * <ul>
+ *     <li>Calling {@link #poll(Duration)} after it has acknowledged all 
records, which commits the acknowledgements
+ *     to Kafka asynchronously. In this case, no exception is thrown by a 
failure to commit the acknowledgements.</li>
+ *     <li>Calling {@link #commitSync()} or {@link #commitAsync()} which 
commits any pending
+ *     acknowledgements to Kafka.</li>
+ *     <li>Calling {@link #close()} which attempts to commit any pending 
acknowledgements and releases
+ *     any remaining acquired records.</li>
  * </ul>
- * <p>The consumer can optionally use the {@code 
internal.share.acknowledgement.mode} configuration property to choose
- * between implicit and explicit acknowledgement, specifying 
<code>"implicit"</code> or <code>"explicit"</code> as required.
- * <p>
  * The consumer guarantees that the records returned in the {@code 
ConsumerRecords} object for a specific topic-partition
  * are in order of increasing offset. For each topic-partition, Kafka 
guarantees that acknowledgements for the records
  * in a batch are performed atomically. This makes error handling 
significantly more straightforward because there can be
@@ -195,12 +195,14 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
  *
  * <h4>Per-record acknowledgement (explicit acknowledgement)</h4>
  * This example demonstrates using different acknowledgement types depending 
on the outcome of processing the records.
+ * Here the {@code share.acknowledgement.mode} property is set to "explicit" 
so the consumer must explicitly acknowledge each record.
  * <pre>
  *     Properties props = new Properties();
  *     props.setProperty(&quot;bootstrap.servers&quot;, 
&quot;localhost:9092&quot;);
  *     props.setProperty(&quot;group.id&quot;, &quot;test&quot;);
  *     props.setProperty(&quot;key.deserializer&quot;, 
&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
  *     props.setProperty(&quot;value.deserializer&quot;, 
&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
+ *     props.setProperty(&quot;share.acknowledgement.mode&quot;, 
&quot;explicit&quot;);
  *     KafkaShareConsumer&lt;String, String&gt; consumer = new 
KafkaShareConsumer&lt;&gt;(props);
  *     consumer.subscribe(Arrays.asList(&quot;foo&quot;));
  *     while (true) {
@@ -227,42 +229,6 @@ import static 
org.apache.kafka.common.utils.Utils.propsToMap;
  * It is only once {@link #commitSync()} is called that the acknowledgements 
are committed by sending the new state
  * information to Kafka.
  *
- * <h4>Per-record acknowledgement, ending processing of the batch on an error 
(explicit acknowledgement)</h4>
- * This example demonstrates ending processing of a batch of records on the 
first error.
- * <pre>
- *     Properties props = new Properties();
- *     props.setProperty(&quot;bootstrap.servers&quot;, 
&quot;localhost:9092&quot;);
- *     props.setProperty(&quot;group.id&quot;, &quot;test&quot;);
- *     props.setProperty(&quot;key.deserializer&quot;, 
&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
- *     props.setProperty(&quot;value.deserializer&quot;, 
&quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
- *     KafkaShareConsumer&lt;String, String&gt; consumer = new 
KafkaShareConsumer&lt;&gt;(props);
- *     consumer.subscribe(Arrays.asList(&quot;foo&quot;));
- *     while (true) {
- *         ConsumerRecords&lt;String, String&gt; records = 
consumer.poll(Duration.ofMillis(100));
- *         for (ConsumerRecord&lt;String, String&gt; record : records) {
- *             try {
- *                 doProcessing(record);
- *                 consumer.acknowledge(record, AcknowledgeType.ACCEPT);
- *             } catch (Exception e) {
- *                 consumer.acknowledge(record, AcknowledgeType.REJECT);
- *                 break;
- *             }
- *         }
- *         consumer.commitSync();
- *     }
- * </pre>
- * There are the following cases in this example:
- * <ol>
- *     <li>The batch contains no records, in which case the application just 
polls again. The call to {@link #commitSync()}
- *     just does nothing because the batch was empty.</li>
- *     <li>All of the records in the batch are processed successfully. The 
calls to {@link #acknowledge(ConsumerRecord, AcknowledgeType)}
- *     specifying {@code AcknowledgeType.ACCEPT} mark all records in the batch 
as successfully processed.</li>
- *     <li>One of the records encounters an exception. The call to {@link 
#acknowledge(ConsumerRecord, AcknowledgeType)} specifying
- *     {@code AcknowledgeType.REJECT} rejects that record. Earlier records in 
the batch have already been marked as successfully
- *     processed. The call to {@link #commitSync()} commits the 
acknowledgements, but the records after the failed record
- *     remain acquired as part of the same delivery attempt and will be 
presented to the application in response to another poll.</li>
- * </ol>
- *
  * <h3>Reading Transactional Records</h3>
  * The way that share groups handle transactional records is controlled by the 
{@code group.share.isolation.level}</code>
  * configuration property. In a share group, the isolation level applies to 
the entire share group, not just individual
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementMode.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementMode.java
new file mode 100644
index 00000000000..0f8effa30a1
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementMode.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+public class ShareAcknowledgementMode {
+    public enum AcknowledgementMode {
+        IMPLICIT, EXPLICIT;
+
+        @Override
+        public String toString() {
+            return super.toString().toLowerCase(Locale.ROOT);
+        }
+    }
+
+    private final AcknowledgementMode acknowledgementMode;
+
+    public static final ShareAcknowledgementMode IMPLICIT = new 
ShareAcknowledgementMode(AcknowledgementMode.IMPLICIT);
+    public static final ShareAcknowledgementMode EXPLICIT = new 
ShareAcknowledgementMode(AcknowledgementMode.EXPLICIT);
+
+    private ShareAcknowledgementMode(AcknowledgementMode acknowledgementMode) {
+        this.acknowledgementMode = acknowledgementMode;
+    }
+
+    /**
+     * Returns the ShareAcknowledgementMode from the given string.
+     */
+    public static ShareAcknowledgementMode fromString(String 
acknowledgementMode) {
+        if (acknowledgementMode == null) {
+            throw new IllegalArgumentException("Acknowledgement mode is null");
+        }
+
+        if 
(Arrays.asList(Utils.enumOptions(AcknowledgementMode.class)).contains(acknowledgementMode))
 {
+            AcknowledgementMode mode = 
AcknowledgementMode.valueOf(acknowledgementMode.toUpperCase(Locale.ROOT));
+            switch (mode) {
+                case IMPLICIT:
+                    return IMPLICIT;
+                case EXPLICIT:
+                    return EXPLICIT;
+                default:
+                    throw new IllegalArgumentException("Invalid 
acknowledgement mode: " + acknowledgementMode);
+            }
+        } else {
+            throw new IllegalArgumentException("Invalid acknowledgement mode: 
" + acknowledgementMode);
+        }
+    }
+
+    /**
+     * Returns the name of the acknowledgement mode.
+     */
+    public String name() {
+        return acknowledgementMode.toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ShareAcknowledgementMode that = (ShareAcknowledgementMode) o;
+        return acknowledgementMode == that.acknowledgementMode;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(acknowledgementMode);
+    }
+
+    @Override
+    public String toString() {
+        return "ShareAcknowledgementMode{" +
+                "mode=" + acknowledgementMode +
+                '}';
+    }
+
+    public static class Validator implements ConfigDef.Validator {
+        @Override
+        public void ensureValid(String name, Object value) {
+            String acknowledgementMode = (String) value;
+            try {
+                fromString(acknowledgementMode);
+            } catch (Exception e) {
+                throw new ConfigException(name, value, "Invalid value `" + 
acknowledgementMode + "` for configuration " +
+                        name + ". The value must either be 'implicit' or 
'explicit'.");
+            }
+        }
+
+        @Override
+        public String toString() {
+            String values = 
Arrays.stream(ShareAcknowledgementMode.AcknowledgementMode.values())
+                    
.map(ShareAcknowledgementMode.AcknowledgementMode::toString).collect(Collectors.joining(",
 "));
+            return "[" + values + "]";
+        }
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index c77443e39a1..bb5193f8dc6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -176,20 +176,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
     private ShareFetch<K, V> currentFetch;
     private AcknowledgementCommitCallbackHandler 
acknowledgementCommitCallbackHandler;
     private final List<Map<TopicIdPartition, Acknowledgements>> 
completedAcknowledgements;
-
-    private enum AcknowledgementMode {
-        /** Acknowledgement mode is not yet known */
-        UNKNOWN,
-        /** Acknowledgement mode is pending, meaning that {@link 
#poll(Duration)} has been called once and
-         * {@link #acknowledge(ConsumerRecord, AcknowledgeType)} has not been 
called */
-        PENDING,
-        /** Acknowledgements are explicit, using {@link 
#acknowledge(ConsumerRecord, AcknowledgeType)} */
-        EXPLICIT,
-        /** Acknowledgements are implicit, not using {@link 
#acknowledge(ConsumerRecord, AcknowledgeType)} */
-        IMPLICIT
-    }
-
-    private AcknowledgementMode acknowledgementMode;
+    private final ShareAcknowledgementMode acknowledgementMode;
 
     /**
      * A thread-safe {@link ShareFetchBuffer fetch buffer} for the results 
that are populated in the
@@ -457,7 +444,8 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                       final ConsumerMetadata metadata,
                       final int requestTimeoutMs,
                       final int defaultApiTimeoutMs,
-                      final String groupId) {
+                      final String groupId,
+                      final String acknowledgementModeConfig) {
         this.log = logContext.logger(getClass());
         this.subscriptions = subscriptions;
         this.clientId = clientId;
@@ -472,7 +460,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         this.metadata = metadata;
         this.requestTimeoutMs = requestTimeoutMs;
         this.defaultApiTimeoutMs = defaultApiTimeoutMs;
-        this.acknowledgementMode = initializeAcknowledgementMode(null, log);
+        this.acknowledgementMode = 
ShareAcknowledgementMode.fromString(acknowledgementModeConfig);
         this.deserializers = new Deserializers<>(keyDeserializer, 
valueDeserializer, metrics);
         this.currentFetch = ShareFetch.empty();
         this.applicationEventHandler = applicationEventHandler;
@@ -582,7 +570,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             handleCompletedAcknowledgements();
 
             // If using implicit acknowledgement, acknowledge the previously 
fetched records
-            acknowledgeBatchIfImplicitAcknowledgement(true);
+            acknowledgeBatchIfImplicitAcknowledgement();
 
             kafkaShareConsumerMetrics.recordPollStart(timer.currentTimeMs());
 
@@ -674,6 +662,10 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                 // Notify the network thread to wake up and start the next 
round of fetching
                 applicationEventHandler.wakeupNetworkThread();
             }
+            if (acknowledgementMode == ShareAcknowledgementMode.EXPLICIT) {
+                // We cannot leave unacknowledged records in EXPLICIT 
acknowledgement mode, so we throw an exception to the application.
+                throw new IllegalStateException("All records must be 
acknowledged in explicit acknowledgement mode.");
+            }
             return currentFetch;
         }
     }
@@ -719,7 +711,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             handleCompletedAcknowledgements();
 
             // If using implicit acknowledgement, acknowledge the previously 
fetched records
-            acknowledgeBatchIfImplicitAcknowledgement(false);
+            acknowledgeBatchIfImplicitAcknowledgement();
 
             Timer requestTimer = time.timer(timeout.toMillis());
             Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = 
acknowledgementsToSend();
@@ -763,7 +755,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             handleCompletedAcknowledgements();
 
             // If using implicit acknowledgement, acknowledge the previously 
fetched records
-            acknowledgeBatchIfImplicitAcknowledgement(false);
+            acknowledgeBatchIfImplicitAcknowledgement();
 
             Map<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = 
acknowledgementsToSend();
             if (!acknowledgementsMap.isEmpty()) {
@@ -1040,29 +1032,11 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
     }
 
     /**
-     * Called to progressively move the acknowledgement mode into IMPLICIT if 
it is not known to be EXPLICIT.
      * If the acknowledgement mode is IMPLICIT, acknowledges all records in 
the current batch.
-     *
-     * @param calledOnPoll If true, called on poll. Otherwise, called on 
commit.
      */
-    private void acknowledgeBatchIfImplicitAcknowledgement(boolean 
calledOnPoll) {
-        if (calledOnPoll) {
-            if (acknowledgementMode == AcknowledgementMode.UNKNOWN) {
-                // The first call to poll(Duration) moves into PENDING
-                acknowledgementMode = AcknowledgementMode.PENDING;
-            } else if (acknowledgementMode == AcknowledgementMode.PENDING && 
!currentFetch.isEmpty()) {
-                // If there are records to acknowledge and PENDING, moves into 
IMPLICIT
-                acknowledgementMode = AcknowledgementMode.IMPLICIT;
-            }
-        } else {
-            // If there are records to acknowledge and PENDING, moves into 
IMPLICIT
-            if (acknowledgementMode == AcknowledgementMode.PENDING && 
!currentFetch.isEmpty()) {
-                acknowledgementMode = AcknowledgementMode.IMPLICIT;
-            }
-        }
-
+    private void acknowledgeBatchIfImplicitAcknowledgement() {
         // If IMPLICIT, acknowledge all records
-        if (acknowledgementMode == AcknowledgementMode.IMPLICIT) {
+        if (acknowledgementMode == ShareAcknowledgementMode.IMPLICIT) {
             currentFetch.acknowledgeAll(AcknowledgeType.ACCEPT);
         }
     }
@@ -1075,36 +1049,20 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
     }
 
     /**
-     * Called to move the acknowledgement mode into EXPLICIT, if it is not 
known to be IMPLICIT.
+     * Called to verify if the acknowledgement mode is EXPLICIT, else throws 
an exception.
      */
     private void ensureExplicitAcknowledgement() {
-        if (acknowledgementMode == AcknowledgementMode.PENDING) {
-            // If poll(Duration) has been called once, moves into EXPLICIT
-            acknowledgementMode = AcknowledgementMode.EXPLICIT;
-        } else if (acknowledgementMode == AcknowledgementMode.IMPLICIT) {
+        if (acknowledgementMode == ShareAcknowledgementMode.IMPLICIT) {
             throw new IllegalStateException("Implicit acknowledgement of 
delivery is being used.");
-        } else if (acknowledgementMode == AcknowledgementMode.UNKNOWN) {
-            throw new IllegalStateException("Acknowledge called before poll.");
         }
     }
 
     /**
      * Initializes the acknowledgement mode based on the configuration.
      */
-    private static AcknowledgementMode 
initializeAcknowledgementMode(ConsumerConfig config, Logger log) {
-        if (config == null) {
-            return AcknowledgementMode.UNKNOWN;
-        }
-        String acknowledgementModeStr = 
config.getString(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
-        if ((acknowledgementModeStr == null) || 
acknowledgementModeStr.isEmpty()) {
-            return AcknowledgementMode.UNKNOWN;
-        } else if (acknowledgementModeStr.equalsIgnoreCase("implicit")) {
-            return AcknowledgementMode.IMPLICIT;
-        } else if (acknowledgementModeStr.equalsIgnoreCase("explicit")) {
-            return AcknowledgementMode.EXPLICIT;
-        }
-        log.warn("Invalid value for config {}: \"{}\"", 
ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
acknowledgementModeStr);
-        return AcknowledgementMode.UNKNOWN;
+    private static ShareAcknowledgementMode 
initializeAcknowledgementMode(ConsumerConfig config, Logger log) {
+        String s = 
config.getString(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
+        return ShareAcknowledgementMode.fromString(s);
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementModeTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementModeTest.java
new file mode 100644
index 00000000000..1ae0df0634d
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareAcknowledgementModeTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.config.ConfigException;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class ShareAcknowledgementModeTest {
+
+    @Test
+    public void testFromString() {
+        assertEquals(ShareAcknowledgementMode.IMPLICIT, 
ShareAcknowledgementMode.fromString("implicit"));
+        assertEquals(ShareAcknowledgementMode.EXPLICIT, 
ShareAcknowledgementMode.fromString("explicit"));
+        assertThrows(IllegalArgumentException.class, () -> 
ShareAcknowledgementMode.fromString("invalid"));
+        assertThrows(IllegalArgumentException.class, () -> 
ShareAcknowledgementMode.fromString("IMPLICIT"));
+        assertThrows(IllegalArgumentException.class, () -> 
ShareAcknowledgementMode.fromString("EXPLICIT"));
+        assertThrows(IllegalArgumentException.class, () -> 
ShareAcknowledgementMode.fromString(""));
+        assertThrows(IllegalArgumentException.class, () -> 
ShareAcknowledgementMode.fromString(null));
+    }
+
+    @Test
+    public void testValidator() {
+        ShareAcknowledgementMode.Validator validator = new 
ShareAcknowledgementMode.Validator();
+        assertDoesNotThrow(() -> validator.ensureValid("test", "implicit"));
+        assertDoesNotThrow(() -> validator.ensureValid("test", "explicit"));
+        assertThrows(ConfigException.class, () -> 
validator.ensureValid("test", "invalid"));
+        assertThrows(ConfigException.class, () -> 
validator.ensureValid("test", "IMPLICIT"));
+        assertThrows(ConfigException.class, () -> 
validator.ensureValid("test", "EXPLICIT"));
+        assertThrows(ConfigException.class, () -> 
validator.ensureValid("test", ""));
+        assertThrows(ConfigException.class, () -> 
validator.ensureValid("test", null));
+    }
+
+    @Test
+    public void testEqualsAndHashCode() {
+        ShareAcknowledgementMode mode1 = ShareAcknowledgementMode.IMPLICIT;
+        ShareAcknowledgementMode mode2 = ShareAcknowledgementMode.IMPLICIT;
+        ShareAcknowledgementMode mode3 = ShareAcknowledgementMode.EXPLICIT;
+
+        assertEquals(mode1, mode2);
+        assertNotEquals(mode1, mode3);
+        assertNotEquals(mode2, mode3);
+
+        assertEquals(mode1.hashCode(), mode2.hashCode());
+        assertNotEquals(mode1.hashCode(), mode3.hashCode());
+    }
+
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 342540c5885..64c729730fb 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
@@ -56,6 +57,7 @@ import org.mockito.Mockito;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -145,14 +147,16 @@ public class ShareConsumerImplTest {
                 mock(ShareFetchBuffer.class),
                 subscriptions,
                 "group-id",
-                "client-id");
+                "client-id",
+                "implicit");
     }
 
     private ShareConsumerImpl<String, String> newConsumer(
             ShareFetchBuffer fetchBuffer,
             SubscriptionState subscriptions,
             String groupId,
-            String clientId
+            String clientId,
+            String acknowledgementMode
     ) {
         final int defaultApiTimeoutMs = 1000;
         final int requestTimeoutMs = 30000;
@@ -173,7 +177,8 @@ public class ShareConsumerImplTest {
                 metadata,
                 requestTimeoutMs,
                 defaultApiTimeoutMs,
-                groupId
+                groupId,
+                acknowledgementMode
         );
     }
 
@@ -345,6 +350,84 @@ public class ShareConsumerImplTest {
         assertDoesNotThrow(() -> consumer.close());
     }
 
+    @Test
+    public void testExplicitModeUnacknowledgedRecords() {
+        // Setup consumer with explicit acknowledgement mode
+        SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
+        consumer = newConsumer(
+                mock(ShareFetchBuffer.class),
+                subscriptions,
+                "group-id",
+                "client-id",
+                "explicit");
+
+        // Setup test data
+        String topic = "test-topic";
+        int partition = 0;
+        TopicIdPartition tip = new TopicIdPartition(Uuid.randomUuid(), 
partition, topic);
+        ShareInFlightBatch<String, String> batch = new ShareInFlightBatch<>(0, 
tip);
+        batch.addRecord(new ConsumerRecord<>(topic, partition, 0, "key1", 
"value1"));
+        batch.addRecord(new ConsumerRecord<>(topic, partition, 1, "key2", 
"value2"));
+
+        // Setup first fetch to return records
+        ShareFetch<String, String> firstFetch = ShareFetch.empty();
+        firstFetch.add(tip, batch);
+        doReturn(firstFetch)
+            .doReturn(ShareFetch.empty())
+            .when(fetchCollector)
+            .collect(any(ShareFetchBuffer.class));
+
+        // Setup subscription
+        List<String> topics = Collections.singletonList(topic);
+        
completeShareSubscriptionChangeApplicationEventSuccessfully(subscriptions, 
topics);
+        consumer.subscribe(topics);
+
+        // First poll should succeed and return records
+        ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(100));
+        assertEquals(2, records.count(), "Should have received 2 records");
+
+        // Second poll should fail because records weren't acknowledged
+        IllegalStateException exception = assertThrows(
+            IllegalStateException.class,
+            () -> consumer.poll(Duration.ofMillis(100))
+        );
+        assertTrue(
+            exception.getMessage().contains("All records must be acknowledged 
in explicit acknowledgement mode."),
+            "Unexpected error message: " + exception.getMessage()
+        );
+
+        // Verify that acknowledging one record but not all still throws 
exception
+        Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
+        consumer.acknowledge(iterator.next());
+        exception = assertThrows(
+            IllegalStateException.class,
+            () -> consumer.poll(Duration.ofMillis(100))
+        );
+        assertTrue(
+            exception.getMessage().contains("All records must be acknowledged 
in explicit acknowledgement mode."),
+            "Unexpected error message: " + exception.getMessage()
+        );
+
+        // Verify that after acknowledging all records, poll succeeds
+        consumer.acknowledge(iterator.next());
+        
+        // Setup second fetch to return new records
+        ShareFetch<String, String> secondFetch = ShareFetch.empty();
+        ShareInFlightBatch<String, String> newBatch = new 
ShareInFlightBatch<>(2, tip);
+        newBatch.addRecord(new ConsumerRecord<>(topic, partition, 2, "key3", 
"value3"));
+        newBatch.addRecord(new ConsumerRecord<>(topic, partition, 3, "key4", 
"value4"));
+        secondFetch.add(tip, newBatch);
+        
+        // Reset mock to return new records
+        doReturn(secondFetch)
+            .when(fetchCollector)
+            .collect(any(ShareFetchBuffer.class));
+
+        // Verify that poll succeeds and returns new records
+        ConsumerRecords<String, String> newRecords = 
consumer.poll(Duration.ofMillis(100));
+        assertEquals(2, newRecords.count(), "Should have received 2 new 
records");
+    }
+
     @Test
     public void testCloseWithTopicAuthorizationException() {
         SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
index 699397f9a70..43ba5185dc0 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.tools.consumer;
 
 import org.apache.kafka.clients.consumer.AcknowledgeType;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaShareConsumer;
 import org.apache.kafka.clients.consumer.ShareConsumer;
@@ -36,6 +37,7 @@ import java.time.Duration;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 
 
@@ -66,7 +68,11 @@ public class ConsoleShareConsumer {
         messageCount = 0;
         long timeoutMs = opts.timeoutMs() >= 0 ? opts.timeoutMs() : 
Long.MAX_VALUE;
 
-        ShareConsumer<byte[], byte[]> consumer = new 
KafkaShareConsumer<>(opts.consumerProps(), new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
+        Properties consumerProps = opts.consumerProps();
+        // Set share acknowledgement mode to explicit.
+        consumerProps.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
"explicit");
+
+        ShareConsumer<byte[], byte[]> consumer = new 
KafkaShareConsumer<>(consumerProps, new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
         ConsumerWrapper consumerWrapper = new ConsumerWrapper(opts.topicArg(), 
consumer, timeoutMs);
 
         addShutdownHook(consumerWrapper);

Reply via email to