This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new e1bb4a2e320 KAFKA-20052: Share commitSync for deleted topic (#21265)
e1bb4a2e320 is described below

commit e1bb4a2e320c5afd87bf3af235d4923f74ade433
Author: Andrew Schofield <[email protected]>
AuthorDate: Wed Jan 7 20:08:29 2026 +0000

    KAFKA-20052: Share commitSync for deleted topic (#21265)
    
    When a topic with outstanding acknowledgements is deleted,
    SHARE_ACKNOWLEDGE fails with a topic-level UNKNOWN_TOPIC_ID error code.
    This error code was not being handled as an immediate failure, which
    meant that the request would be retried until it timed out. As a result,
    `ShareConsumer.commitSync()` failed with a `TimeoutException` for the
    partition, rather than `UnknownTopicIdException`.
    
    In addition, there was a mistake in `ShareInFlightBatch` which meant
    that an NPE occurred as the timeout was delivered.
    
    Reviewers: Apoorv Mittal <[email protected]>, Chirag Wadhwa
    <[email protected]>
---
 .../kafka/clients/consumer/ShareConsumerTest.java  | 32 ++++++++++++++++++++++
 .../internals/ShareConsumeRequestManager.java      | 13 +++++----
 .../consumer/internals/ShareInFlightBatch.java     | 22 ++++++++-------
 3 files changed, 51 insertions(+), 16 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index 85fa049636e..d8095838fa0 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -47,6 +47,7 @@ import 
org.apache.kafka.common.errors.InvalidRecordStateException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnknownTopicIdException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
@@ -2924,6 +2925,37 @@ public class ShareConsumerTest {
         }
     }
 
+    @ClusterTest
+    public void testCommitSyncFailsForDeletedTopic() throws 
InterruptedException {
+        Uuid topicId = createTopic("baz", 1, 1);
+        alterShareAutoOffsetReset("group1", "earliest");
+        try (Producer<byte[], byte[]> producer = createProducer();
+             ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+                 "group1",
+                 Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, 
EXPLICIT))
+        ) {
+            for (int i = 0; i < 10; i++) {
+                ProducerRecord<byte[], byte[]> record = new 
ProducerRecord<>("baz", 0, null, "key".getBytes(), ("Message " + i).getBytes());
+                producer.send(record);
+            }
+            producer.flush();
+
+            shareConsumer.subscribe(List.of("baz"));
+            ConsumerRecords<byte[], byte[]> records = 
waitedPoll(shareConsumer, 2500L, 10);
+            assertEquals(10, records.count());
+
+            records.forEach(shareConsumer::acknowledge);
+
+            // Topic deletion does not necessarily become apparent across the 
cluster immediately, so sleep a short while
+            deleteTopic("baz");
+            Thread.sleep(5000);
+
+            Map<TopicIdPartition, Optional<KafkaException>> commitResult = 
shareConsumer.commitSync();
+            assertEquals(1, commitResult.size());
+            assertInstanceOf(UnknownTopicIdException.class, 
commitResult.get(new TopicIdPartition(topicId, 0, "baz")).get());
+        }
+    }
+
     @ClusterTest
     public void testRenewAcknowledgementOnPoll() {
         alterShareAutoOffsetReset("group1", "earliest");
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index e57265716f5..c30073d4104 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -1056,7 +1056,8 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                                       Optional<Integer> 
acquisitionLockTimeoutMs) {
         if (partitionError.exception() != null) {
             boolean retry = false;
-            if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER || 
partitionError == Errors.FENCED_LEADER_EPOCH || partitionError == 
Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+            if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER || 
partitionError == Errors.FENCED_LEADER_EPOCH ||
+                partitionError == Errors.UNKNOWN_TOPIC_OR_PARTITION || 
partitionError == Errors.UNKNOWN_TOPIC_ID) {
                 // If the leader has changed, there's no point in retrying the 
operation because the acquisition locks
                 // will have been released.
                 // If the topic or partition has been deleted, we do not retry 
the failed acknowledgements.
@@ -1318,11 +1319,11 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
          * Sets the error code in the acknowledgements and sends the response
          * through a background event.
          */
-        void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors 
acknowledgeErrorCode, boolean isRenewAck, Optional<Integer> 
acquisitionLockTimeoutMs) {
+        void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors 
acknowledgeErrorCode, boolean checkForRenewAcknowledgements, Optional<Integer> 
acquisitionLockTimeoutMs) {
             Acknowledgements acks = inFlightAcknowledgements.remove(tip);
             if (acks != null) {
                 acks.complete(acknowledgeErrorCode.exception());
-                resultHandler.complete(tip, acks, requestType, isRenewAck, 
acquisitionLockTimeoutMs);
+                resultHandler.complete(tip, acks, requestType, 
checkForRenewAcknowledgements, acquisitionLockTimeoutMs);
             } else {
                 log.error("Invalid partition {} received in ShareAcknowledge 
response", tip);
             }
@@ -1454,17 +1455,17 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
          * Handle the result of a ShareAcknowledge request sent to one or more 
nodes and
          * signal the completion when all results are known.
          */
-        public void complete(TopicIdPartition partition, Acknowledgements 
acknowledgements, AcknowledgeRequestType type, boolean isRenewAck, 
Optional<Integer> acquisitionLockTimeoutMs) {
+        public void complete(TopicIdPartition partition, Acknowledgements 
acknowledgements, AcknowledgeRequestType type, boolean 
checkForRenewAcknowledgements, Optional<Integer> acquisitionLockTimeoutMs) {
             if (type.equals(AcknowledgeRequestType.COMMIT_ASYNC)) {
                 if (acknowledgements != null) {
-                    maybeSendShareAcknowledgementEvent(Map.of(partition, 
acknowledgements), isRenewAck, acquisitionLockTimeoutMs);
+                    maybeSendShareAcknowledgementEvent(Map.of(partition, 
acknowledgements), checkForRenewAcknowledgements, acquisitionLockTimeoutMs);
                 }
             } else {
                 if (acknowledgements != null) {
                     result.put(partition, acknowledgements);
                 }
                 if (remainingResults != null && 
remainingResults.decrementAndGet() == 0) {
-                    maybeSendShareAcknowledgementEvent(result, isRenewAck, 
acquisitionLockTimeoutMs);
+                    maybeSendShareAcknowledgementEvent(result, 
checkForRenewAcknowledgements, acquisitionLockTimeoutMs);
                     future.ifPresent(future -> future.complete(result));
                 }
             }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
index ae0baa6c1c7..8400202d85a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
@@ -148,16 +148,18 @@ public class ShareInFlightBatch<K, V> {
         int recordsRenewed = 0;
         boolean isCompletedExceptionally = 
acknowledgements.isCompletedExceptionally();
         if (acknowledgements.isCompleted()) {
-            Map<Long, AcknowledgeType> ackTypeMap = 
acknowledgements.getAcknowledgementsTypeMap();
-            for (Map.Entry<Long, AcknowledgeType> ackTypeEntry : 
ackTypeMap.entrySet()) {
-                long offset = ackTypeEntry.getKey();
-                AcknowledgeType ackType = ackTypeEntry.getValue();
-                ConsumerRecord<K, V> record = renewingRecords.remove(offset);
-                if (ackType == AcknowledgeType.RENEW) {
-                    if (record != null && !isCompletedExceptionally) {
-                        // The record is moved into renewed state, and will 
then become in-flight later.
-                        renewedRecords.put(offset, record);
-                        recordsRenewed++;
+            if (renewingRecords != null) {
+                Map<Long, AcknowledgeType> ackTypeMap = 
acknowledgements.getAcknowledgementsTypeMap();
+                for (Map.Entry<Long, AcknowledgeType> ackTypeEntry : 
ackTypeMap.entrySet()) {
+                    long offset = ackTypeEntry.getKey();
+                    AcknowledgeType ackType = ackTypeEntry.getValue();
+                    ConsumerRecord<K, V> record = 
renewingRecords.remove(offset);
+                    if (ackType == AcknowledgeType.RENEW) {
+                        if (record != null && !isCompletedExceptionally) {
+                            // The record is moved into renewed state, and 
will then become in-flight later.
+                            renewedRecords.put(offset, record);
+                            recordsRenewed++;
+                        }
                     }
                 }
             }

Reply via email to