This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6057c2d8c21 KAFKA-20052: Share commitSync for deleted topic (#21265)
6057c2d8c21 is described below
commit 6057c2d8c21216b0090b81f65598c2d94ac2a497
Author: Andrew Schofield <[email protected]>
AuthorDate: Wed Jan 7 20:08:29 2026 +0000
KAFKA-20052: Share commitSync for deleted topic (#21265)
When a topic with outstanding acknowledgements is deleted,
SHARE_ACKNOWLEDGE fails with a topic-level UNKNOWN_TOPIC_ID error code.
This error code was not being handled as an immediate failure, which
meant that the request would be retried until it timed out. As a result,
`ShareConsumer.commitSync()` failed with a `TimeoutException` for the
partition, rather than `UnknownTopicIdException`.
In addition, there was a mistake in `ShareInFlightBatch` which meant
that an NPE occurred as the timeout was delivered.
Reviewers: Apoorv Mittal <[email protected]>, Chirag Wadhwa
<[email protected]>
---
.../kafka/clients/consumer/ShareConsumerTest.java | 32 ++++++++++++++++++++++
.../internals/ShareConsumeRequestManager.java | 13 +++++----
.../consumer/internals/ShareInFlightBatch.java | 22 ++++++++-------
3 files changed, 51 insertions(+), 16 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
index bd62d889083..32b05351e98 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java
@@ -49,6 +49,7 @@ import
org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnknownTopicIdException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
@@ -2926,6 +2927,37 @@ public class ShareConsumerTest {
}
}
+ @ClusterTest
+ public void testCommitSyncFailsForDeletedTopic() throws
InterruptedException {
+ Uuid topicId = createTopic("baz", 1, 1);
+ alterShareAutoOffsetReset("group1", "earliest");
+ try (Producer<byte[], byte[]> producer = createProducer();
+ ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer(
+ "group1",
+ Map.of(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
EXPLICIT))
+ ) {
+ for (int i = 0; i < 10; i++) {
+ ProducerRecord<byte[], byte[]> record = new
ProducerRecord<>("baz", 0, null, "key".getBytes(), ("Message " + i).getBytes());
+ producer.send(record);
+ }
+ producer.flush();
+
+ shareConsumer.subscribe(List.of("baz"));
+ ConsumerRecords<byte[], byte[]> records =
waitedPoll(shareConsumer, 2500L, 10);
+ assertEquals(10, records.count());
+
+ records.forEach(shareConsumer::acknowledge);
+
+ // Topic deletion does not necessarily become apparent across the
cluster immediately, so sleep a short while
+ deleteTopic("baz");
+ Thread.sleep(5000);
+
+ Map<TopicIdPartition, Optional<KafkaException>> commitResult =
shareConsumer.commitSync();
+ assertEquals(1, commitResult.size());
+ assertInstanceOf(UnknownTopicIdException.class,
commitResult.get(new TopicIdPartition(topicId, 0, "baz")).get());
+ }
+ }
+
@ClusterTest
public void testRenewAcknowledgementOnPoll() {
alterShareAutoOffsetReset("group1", "earliest");
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
index e57265716f5..c30073d4104 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java
@@ -1056,7 +1056,8 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
Optional<Integer>
acquisitionLockTimeoutMs) {
if (partitionError.exception() != null) {
boolean retry = false;
- if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER ||
partitionError == Errors.FENCED_LEADER_EPOCH || partitionError ==
Errors.UNKNOWN_TOPIC_OR_PARTITION) {
+ if (partitionError == Errors.NOT_LEADER_OR_FOLLOWER ||
partitionError == Errors.FENCED_LEADER_EPOCH ||
+ partitionError == Errors.UNKNOWN_TOPIC_OR_PARTITION ||
partitionError == Errors.UNKNOWN_TOPIC_ID) {
// If the leader has changed, there's no point in retrying the
operation because the acquisition locks
// will have been released.
// If the topic or partition has been deleted, we do not retry
the failed acknowledgements.
@@ -1318,11 +1319,11 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
* Sets the error code in the acknowledgements and sends the response
* through a background event.
*/
- void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors
acknowledgeErrorCode, boolean isRenewAck, Optional<Integer>
acquisitionLockTimeoutMs) {
+ void handleAcknowledgeErrorCode(TopicIdPartition tip, Errors
acknowledgeErrorCode, boolean checkForRenewAcknowledgements, Optional<Integer>
acquisitionLockTimeoutMs) {
Acknowledgements acks = inFlightAcknowledgements.remove(tip);
if (acks != null) {
acks.complete(acknowledgeErrorCode.exception());
- resultHandler.complete(tip, acks, requestType, isRenewAck,
acquisitionLockTimeoutMs);
+ resultHandler.complete(tip, acks, requestType,
checkForRenewAcknowledgements, acquisitionLockTimeoutMs);
} else {
log.error("Invalid partition {} received in ShareAcknowledge
response", tip);
}
@@ -1454,17 +1455,17 @@ public class ShareConsumeRequestManager implements
RequestManager, MemberStateLi
* Handle the result of a ShareAcknowledge request sent to one or more
nodes and
* signal the completion when all results are known.
*/
- public void complete(TopicIdPartition partition, Acknowledgements
acknowledgements, AcknowledgeRequestType type, boolean isRenewAck,
Optional<Integer> acquisitionLockTimeoutMs) {
+ public void complete(TopicIdPartition partition, Acknowledgements
acknowledgements, AcknowledgeRequestType type, boolean
checkForRenewAcknowledgements, Optional<Integer> acquisitionLockTimeoutMs) {
if (type.equals(AcknowledgeRequestType.COMMIT_ASYNC)) {
if (acknowledgements != null) {
- maybeSendShareAcknowledgementEvent(Map.of(partition,
acknowledgements), isRenewAck, acquisitionLockTimeoutMs);
+ maybeSendShareAcknowledgementEvent(Map.of(partition,
acknowledgements), checkForRenewAcknowledgements, acquisitionLockTimeoutMs);
}
} else {
if (acknowledgements != null) {
result.put(partition, acknowledgements);
}
if (remainingResults != null &&
remainingResults.decrementAndGet() == 0) {
- maybeSendShareAcknowledgementEvent(result, isRenewAck,
acquisitionLockTimeoutMs);
+ maybeSendShareAcknowledgementEvent(result,
checkForRenewAcknowledgements, acquisitionLockTimeoutMs);
future.ifPresent(future -> future.complete(result));
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
index ae0baa6c1c7..8400202d85a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareInFlightBatch.java
@@ -148,16 +148,18 @@ public class ShareInFlightBatch<K, V> {
int recordsRenewed = 0;
boolean isCompletedExceptionally =
acknowledgements.isCompletedExceptionally();
if (acknowledgements.isCompleted()) {
- Map<Long, AcknowledgeType> ackTypeMap =
acknowledgements.getAcknowledgementsTypeMap();
- for (Map.Entry<Long, AcknowledgeType> ackTypeEntry :
ackTypeMap.entrySet()) {
- long offset = ackTypeEntry.getKey();
- AcknowledgeType ackType = ackTypeEntry.getValue();
- ConsumerRecord<K, V> record = renewingRecords.remove(offset);
- if (ackType == AcknowledgeType.RENEW) {
- if (record != null && !isCompletedExceptionally) {
- // The record is moved into renewed state, and will
then become in-flight later.
- renewedRecords.put(offset, record);
- recordsRenewed++;
+ if (renewingRecords != null) {
+ Map<Long, AcknowledgeType> ackTypeMap =
acknowledgements.getAcknowledgementsTypeMap();
+ for (Map.Entry<Long, AcknowledgeType> ackTypeEntry :
ackTypeMap.entrySet()) {
+ long offset = ackTypeEntry.getKey();
+ AcknowledgeType ackType = ackTypeEntry.getValue();
+ ConsumerRecord<K, V> record =
renewingRecords.remove(offset);
+ if (ackType == AcknowledgeType.RENEW) {
+ if (record != null && !isCompletedExceptionally) {
+ // The record is moved into renewed state, and
will then become in-flight later.
+ renewedRecords.put(offset, record);
+ recordsRenewed++;
+ }
}
}
}