This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 87657fdfc72 KAFKA-19827: Call ack commit callback at end of waiting 
calls (#20758)
87657fdfc72 is described below

commit 87657fdfc721055835f5b1f22151c461e85eab4a
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Oct 24 08:56:37 2025 +0100

    KAFKA-19827: Call ack commit callback at end of waiting calls (#20758)
    
    The acknowledgement commit callback in the share consumer gets called on
    the application thread at the start of the poll, commitSync and
    commitAsync methods. Specifically in the peculiar case of using the
    callback together with commitSync, the acknowledgement callback for the
    committed records is called at the start of the next eligible call, even
    though the information is already known at the end of the commitSync's
    execution. The results are correct already, but the timing could be
    improved in some situations.
    
    Reviewers: Apoorv Mittal <[email protected]>, Shivsundar R
    <[email protected]>
---
 .../kafka/clients/consumer/ShareConsumerTest.java  | 39 +++++++++++++++++++---
 .../consumer/internals/ShareConsumerImpl.java      | 14 +++++++-
 2 files changed, 48 insertions(+), 5 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 998ac2c585d..391796fc5dc 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -357,10 +357,41 @@ public class ShareConsumerTest {
             TestUtils.waitForCondition(() -> 
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
                 DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records 
for share consumer");
 
-            TestUtils.waitForCondition(() -> {
-                shareConsumer.poll(Duration.ofMillis(500));
-                return partitionOffsetsMap.containsKey(tp);
-            }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive call to 
callback");
+            // The callback should be called before the return of the poll, 
even when there are no more records.
+            ConsumerRecords<byte[], byte[]> records = 
shareConsumer.poll(Duration.ofMillis(2000));
+            assertEquals(0, records.count());
+            assertTrue(partitionOffsetsMap.containsKey(tp));
+
+            // We expect no exception as the acknowledgement error code is 
null.
+            assertFalse(partitionExceptionMap.containsKey(tp));
+            verifyShareGroupStateTopicRecordsProduced();
+        }
+    }
+
+    @ClusterTest
+    public void 
testAcknowledgementCommitCallbackSuccessfulAcknowledgementOnCommitSync() throws 
Exception {
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = 
createShareConsumer("group1")) {
+
+            Map<TopicPartition, Set<Long>> partitionOffsetsMap = new 
HashMap<>();
+            Map<TopicPartition, Exception> partitionExceptionMap = new 
HashMap<>();
+            ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), 
"value".getBytes());
+
+            producer.send(record);
+            producer.flush();
+
+            shareConsumer.setAcknowledgementCommitCallback(new 
TestableAcknowledgementCommitCallback(partitionOffsetsMap, 
partitionExceptionMap));
+            shareConsumer.subscribe(Set.of(tp.topic()));
+
+            TestUtils.waitForCondition(() -> 
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
+                DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records 
for share consumer");
+
+            // The acknowledgement commit callback should be called before the 
commitSync returns
+            // once the records have been confirmed to have been acknowledged.
+            Map<TopicIdPartition, Optional<KafkaException>> result = 
shareConsumer.commitSync();
+            assertEquals(1, result.size());
+            assertTrue(partitionOffsetsMap.containsKey(tp));
 
             // We expect no exception as the acknowledgement error code is 
null.
             assertFalse(partitionExceptionMap.containsKey(tp));
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 79697503256..4a7e19a6e56 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -609,6 +609,15 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             throw e.cause();
         } finally {
             kafkaShareConsumerMetrics.recordPollEnd(timer.currentTimeMs());
+
+            // Handle any acknowledgements which completed while we were 
waiting, but do not throw
+            // the exception because the fetched records would then not be 
returned to the caller
+            try {
+                handleCompletedAcknowledgements(false);
+            } catch (Throwable t) {
+                log.warn("Exception thrown in acknowledgement commit 
callback", t);
+            }
+
             release();
         }
     }
@@ -752,8 +761,11 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                             result.put(tip, Optional.of(exception));
                         }
                     });
-                    return result;
 
+                    // Handle any acknowledgements which completed while we 
were waiting
+                    handleCompletedAcknowledgements(false);
+
+                    return result;
                 } finally {
                     wakeupTrigger.clearTask();
                 }

Reply via email to