This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 87657fdfc72 KAFKA-19827: Call ack commit callback at end of waiting
calls (#20758)
87657fdfc72 is described below
commit 87657fdfc721055835f5b1f22151c461e85eab4a
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Oct 24 08:56:37 2025 +0100
KAFKA-19827: Call ack commit callback at end of waiting calls (#20758)
The acknowledgement commit callback in the share consumer gets called on
the application thread at the start of the poll, commitSync and
commitAsync methods. Specifically in the peculiar case of using the
callback together with commitSync, the acknowledgement callback for the
committed records is called at the start of the next eligible call, even
though the information is already known at the end of the commitSync's
execution. The results are correct already, but the timing could be
improved in some situations.
Reviewers: Apoorv Mittal <[email protected]>, Shivsundar R
<[email protected]>
---
.../kafka/clients/consumer/ShareConsumerTest.java | 39 +++++++++++++++++++---
.../consumer/internals/ShareConsumerImpl.java | 14 +++++++-
2 files changed, 48 insertions(+), 5 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 998ac2c585d..391796fc5dc 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -357,10 +357,41 @@ public class ShareConsumerTest {
TestUtils.waitForCondition(() ->
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records
for share consumer");
- TestUtils.waitForCondition(() -> {
- shareConsumer.poll(Duration.ofMillis(500));
- return partitionOffsetsMap.containsKey(tp);
- }, DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to receive call to
callback");
+ // The callback should be called before the return of the poll,
even when there are no more records.
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(2000));
+ assertEquals(0, records.count());
+ assertTrue(partitionOffsetsMap.containsKey(tp));
+
+ // We expect no exception as the acknowledgement error code is
null.
+ assertFalse(partitionExceptionMap.containsKey(tp));
+ verifyShareGroupStateTopicRecordsProduced();
+ }
+ }
+
+ @ClusterTest
+ public void
testAcknowledgementCommitCallbackSuccessfulAcknowledgementOnCommitSync() throws
Exception {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer("group1")) {
+
+ Map<TopicPartition, Set<Long>> partitionOffsetsMap = new
HashMap<>();
+ Map<TopicPartition, Exception> partitionExceptionMap = new
HashMap<>();
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+
+ producer.send(record);
+ producer.flush();
+
+ shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallback(partitionOffsetsMap,
partitionExceptionMap));
+ shareConsumer.subscribe(Set.of(tp.topic()));
+
+ TestUtils.waitForCondition(() ->
shareConsumer.poll(Duration.ofMillis(2000)).count() == 1,
+ DEFAULT_MAX_WAIT_MS, 100L, () -> "Failed to consume records
for share consumer");
+
+ // The acknowledgement commit callback should be called before the
commitSync returns
+ // once the records have been confirmed to have been acknowledged.
+ Map<TopicIdPartition, Optional<KafkaException>> result =
shareConsumer.commitSync();
+ assertEquals(1, result.size());
+ assertTrue(partitionOffsetsMap.containsKey(tp));
// We expect no exception as the acknowledgement error code is
null.
assertFalse(partitionExceptionMap.containsKey(tp));
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 79697503256..4a7e19a6e56 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -609,6 +609,15 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
throw e.cause();
} finally {
kafkaShareConsumerMetrics.recordPollEnd(timer.currentTimeMs());
+
+ // Handle any acknowledgements which completed while we were
waiting, but do not throw
+ // the exception because the fetched records would then not be
returned to the caller
+ try {
+ handleCompletedAcknowledgements(false);
+ } catch (Throwable t) {
+ log.warn("Exception thrown in acknowledgement commit
callback", t);
+ }
+
release();
}
}
@@ -752,8 +761,11 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
result.put(tip, Optional.of(exception));
}
});
- return result;
+ // Handle any acknowledgements which completed while we
were waiting
+ handleCompletedAcknowledgements(false);
+
+ return result;
} finally {
wakeupTrigger.clearTask();
}