This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new c7db98e15e4 KAFKA-18829: Added check before converting to IMPLICIT
mode (#18964) (Cherry-pick) (#18982)
c7db98e15e4 is described below
commit c7db98e15e4c85c37610e768b635676a3c0d755a
Author: Shivsundar R <[email protected]>
AuthorDate: Fri Feb 21 04:28:41 2025 -0500
KAFKA-18829: Added check before converting to IMPLICIT mode (#18964)
(Cherry-pick) (#18982)
Cherry-picked
https://github.com/apache/kafka/commit/3603c8fe35d2602db97979a3b24025fc90873069
into 4.0.
This was a bug fix to address
https://issues.apache.org/jira/browse/KAFKA-18829.
Now, we will only move to IMPLICIT mode in `ShareConsumerImpl`, if there
were any records to be acknowledged, and if the next
`poll()`/`commitAsync()`/`commitSync()` was called.
Reviewers: Andrew Schofield <[email protected]>
---
.../consumer/internals/ShareConsumerImpl.java | 4 +--
.../java/kafka/test/api/ShareConsumerTest.java | 37 ++++++++++++++++++++++
2 files changed, 39 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 4a39c75745e..714d0761438 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -1000,8 +1000,8 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
if (acknowledgementMode == AcknowledgementMode.UNKNOWN) {
// The first call to poll(Duration) moves into PENDING
acknowledgementMode = AcknowledgementMode.PENDING;
- } else if (acknowledgementMode == AcknowledgementMode.PENDING) {
- // The second call to poll(Duration) if PENDING moves into
IMPLICIT
+ } else if (acknowledgementMode == AcknowledgementMode.PENDING &&
!currentFetch.isEmpty()) {
+ // If there are records to acknowledge and PENDING, moves into
IMPLICIT
acknowledgementMode = AcknowledgementMode.IMPLICIT;
}
} else {
diff --git a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
index 89b357cf4c3..7b3f468c05c 100644
--- a/core/src/test/java/kafka/test/api/ShareConsumerTest.java
+++ b/core/src/test/java/kafka/test/api/ShareConsumerTest.java
@@ -656,6 +656,43 @@ public class ShareConsumerTest {
}
}
+ @ParameterizedTest(name = "{displayName}.persister={0}")
+ @ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})
+ public void testImplicitModeNotTriggeredByPollWhenNoAcksToSend(String
persister) throws InterruptedException {
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (KafkaProducer<byte[], byte[]> producer = createProducer(new
ByteArraySerializer(), new ByteArraySerializer());
+ KafkaShareConsumer<byte[], byte[]> shareConsumer =
createShareConsumer(new ByteArrayDeserializer(), new ByteArrayDeserializer(),
"group1")) {
+
+ shareConsumer.subscribe(Collections.singleton(tp.topic()));
+
+ Map<TopicPartition, Set<Long>> partitionOffsetsMap1 = new
HashMap<>();
+ Map<TopicPartition, Exception> partitionExceptionMap1 = new
HashMap<>();
+ shareConsumer.setAcknowledgementCommitCallback(new
TestableAcknowledgementCommitCallback(partitionOffsetsMap1,
partitionExceptionMap1));
+
+ // The acknowledgement mode moves to PENDING from UNKNOWN.
+ ConsumerRecords<byte[], byte[]> records =
shareConsumer.poll(Duration.ofMillis(5000));
+ assertEquals(0, records.count());
+ shareConsumer.commitAsync();
+
+ ProducerRecord<byte[], byte[]> record1 = new
ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(),
"value".getBytes());
+ producer.send(record1);
+ producer.flush();
+
+ // The acknowledgement mode remains in PENDING because no records
were returned.
+ records = shareConsumer.poll(Duration.ofMillis(5000));
+ assertEquals(1, records.count());
+
+ // The acknowledgement mode now moves to EXPLICIT.
+ shareConsumer.acknowledge(records.iterator().next());
+ shareConsumer.commitAsync();
+
+ TestUtils.waitForCondition(() -> {
+ shareConsumer.poll(Duration.ofMillis(500));
+ return partitionExceptionMap1.containsKey(tp);
+ }, 30000, 100L, () -> "Didn't receive call to callback");
+ }
+ }
+
@Flaky("KAFKA-18033")
@ParameterizedTest(name = "{displayName}.persister={0}")
@ValueSource(strings = {NO_OP_PERSISTER, DEFAULT_STATE_PERSISTER})