This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new c7db98e15e4 KAFKA-18829: Added check before converting to IMPLICIT 
mode (#18964) (Cherry-pick) (#18982)
c7db98e15e4 is described below

commit c7db98e15e4c85c37610e768b635676a3c0d755a
Author: Shivsundar R <[email protected]>
AuthorDate: Fri Feb 21 04:28:41 2025 -0500

    KAFKA-18829: Added check before converting to IMPLICIT mode (#18964) 
(Cherry-pick) (#18982)
    
    Cherry-picked
    
https://github.com/apache/kafka/commit/3603c8fe35d2602db97979a3b24025fc90873069
    into 4.0.
    This was a bug fix to address
    https://issues.apache.org/jira/browse/KAFKA-18829.
    Now, we will only move to IMPLICIT mode in `ShareConsumerImpl`, if there
    were any records to be acknowledged, and if the next
    `poll()`/`commitAsync()`/`commitSync()` was called.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../consumer/internals/ShareConsumerImpl.java      |  4 +--
 .../java/kafka/test/api/ShareConsumerTest.java     | 37 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 4a39c75745e..714d0761438 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -1000,8 +1000,8 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             if (acknowledgementMode == AcknowledgementMode.UNKNOWN) {
                 // The first call to poll(Duration) moves into PENDING
                 acknowledgementMode = AcknowledgementMode.PENDING;
-            } else if (acknowledgementMode == AcknowledgementMode.PENDING) {
-                // The second call to poll(Duration) if PENDING moves into 
IMPLICIT
+            } else if (acknowledgementMode == AcknowledgementMode.PENDING && 
!currentFetch.isEmpty()) {
+                // If there are records to acknowledge and PENDING, moves into 
IMPLICIT
                 acknowledgementMode = AcknowledgementMode.IMPLICIT;
             }
         } else {
diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java 
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index 89b357cf4c3..7b3f468c05c 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -656,6 +656,43 @@ public class ShareConsumerTest {
         }
     }
 
+    @ParameterizedTest(name = "{displayName}.persister={0}")
+    @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
+    public void testImplicitModeNotTriggeredByPollWhenNoAcksToSend(String 
persister) throws InterruptedException {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (KafkaProducer<byte[], byte[]> producer = createProducer(new 
ByteArraySerializer(), new ByteArraySerializer());
+             KafkaShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(), 
"group1")) {
+
+            shareConsumer.subscribe(Collections.singleton(tp.topic()));
+
+            Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new 
HashMap<>();
+            Map<TopicPartition, Exception> partitionExceptionMap1 = new 
HashMap<>();
+            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap1, 
partitionExceptionMap1));
+
+            // The acknowledgement mode moves to PENDING from UNKNOWN.
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(5000));
+            assertEquals(0, records.count());
+            shareConsumer.commitAsync();
+
+            ProducerRecord<byte[], byte[]> record1 = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+            producer.send(record1);
+            producer.flush();
+
+            // The acknowledgement mode remains in PENDING because no records 
were returned.
+            records = shareConsumer.poll(Duration.ofMillis(5000));
+            assertEquals(1, records.count());
+
+            // The acknowledgement mode now moves to EXPLICIT.
+            shareConsumer.acknowledge(records.iterator().next());
+            shareConsumer.commitAsync();
+
+            TestUtils.waitForCondition(() -> {
+                shareConsumer.poll(Duration.ofMillis(500));
+                return partitionExceptionMap1.containsKey(tp);
+            }, 30000, 100L, () -> "Didn't receive call to callback");
+        }
+    }
+
     @Flaky("KAFKA-18033")
     @ParameterizedTest(name = "{displayName}.persister={0}")
     @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})

Reply via email to