This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d6728d4b29a KAFKA-19789: Log an error when we get duplicate acquired
offsets in ShareFetchResponse. (#20752)
d6728d4b29a is described below
commit d6728d4b29ae28b4016d5b8568b8971e7892f669
Author: Shivsundar R <[email protected]>
AuthorDate: Tue Oct 28 20:44:30 2025 -0400
KAFKA-19789: Log an error when we get duplicate acquired offsets in
ShareFetchResponse. (#20752)
*What*
https://issues.apache.org/jira/browse/KAFKA-19789
- There were some scenarios where `ShareFetchResponse` contained
duplicate acquired records, this was a broker side bug.
- Although ideally this should not happen, the client was not expecting
this case and acknowledged with `GAP` type for any duplicate occurrence.
- This case should be logged as an error in the client, and we must not
acknowledge the duplicate offsets as the broker is already in a bad
state.
- PR adds an error log for this case and a unit test for the same.
Reviewers: Chia-Ping Tsai <[email protected]>, Andrew Schofield
<[email protected]>
---
.../consumer/internals/ShareCompletedFetch.java | 19 +++++--
.../internals/ShareCompletedFetchTest.java | 58 ++++++++++++++++++++++
2 files changed, 74 insertions(+), 3 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
index 2c337782dd4..95e40a3c826 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
@@ -41,9 +41,9 @@ import org.slf4j.Logger;
import java.io.Closeable;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Optional;
@@ -99,10 +99,23 @@ public class ShareCompletedFetch {
}
private List<OffsetAndDeliveryCount>
buildAcquiredRecordList(List<ShareFetchResponseData.AcquiredRecords>
partitionAcquiredRecords) {
- List<OffsetAndDeliveryCount> acquiredRecordList = new LinkedList<>();
+ // Setting the size of the array to the size of the first batch of
acquired records. In case there is only 1 batch acquired, resizing would not
happen.
+ if (partitionAcquiredRecords.isEmpty()) {
+ return List.of();
+ }
+ int initialListSize = (int)
(partitionAcquiredRecords.get(0).lastOffset() -
partitionAcquiredRecords.get(0).firstOffset() + 1);
+ List<OffsetAndDeliveryCount> acquiredRecordList = new
ArrayList<>(initialListSize);
+
+ // Set to find duplicates in case of overlapping acquired records
+ Set<Long> offsets = new HashSet<>();
partitionAcquiredRecords.forEach(acquiredRecords -> {
for (long offset = acquiredRecords.firstOffset(); offset <=
acquiredRecords.lastOffset(); offset++) {
- acquiredRecordList.add(new OffsetAndDeliveryCount(offset,
acquiredRecords.deliveryCount()));
+ if (!offsets.add(offset)) {
+ log.error("Duplicate acquired record offset {} found in
share fetch response for partition {}. " +
+ "This indicates a broker processing issue.",
offset, partition.topicPartition());
+ } else {
+ acquiredRecordList.add(new OffsetAndDeliveryCount(offset,
acquiredRecords.deliveryCount()));
+ }
}
});
return acquiredRecordList;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
index a1814fd935c..95f16966292 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
@@ -60,6 +60,7 @@ import static
org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class ShareCompletedFetchTest {
private static final String TOPIC_NAME = "test";
@@ -356,6 +357,63 @@ public class ShareCompletedFetchTest {
assertEquals(0, records.size());
}
+ @Test
+ public void
testOverlappingAcquiredRecordsLogsErrorAndRetainsFirstOccurrence() {
+ int startingOffset = 0;
+ int numRecords = 20; // Records for 0-19
+
+ // Create overlapping acquired records: [0-9] and [5-14]
+ // Offsets 5-9 will be duplicates
+ List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new
ArrayList<>();
+ acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords()
+ .setFirstOffset(0L)
+ .setLastOffset(9L)
+ .setDeliveryCount((short) 1));
+ acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords()
+ .setFirstOffset(5L)
+ .setLastOffset(14L)
+ .setDeliveryCount((short) 2));
+
+ ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
+ .setRecords(newRecords(startingOffset, numRecords))
+ .setAcquiredRecords(acquiredRecords);
+
+ ShareCompletedFetch completedFetch =
newShareCompletedFetch(partitionData);
+
+ Deserializers<String, String> deserializers = newStringDeserializers();
+
+ // Fetch records and verify that only 15 unique records are returned
(0-14)
+ ShareInFlightBatch<String, String> batch =
completedFetch.fetchRecords(deserializers, 20, true);
+ List<ConsumerRecord<String, String>> records =
batch.getInFlightRecords();
+
+ // Should get 15 unique records: 0-9 from first range (with
deliveryCount=1)
+ // and 10-14 from second range (with deliveryCount=2)
+ assertEquals(15, records.size());
+
+ // Verify first occurrence (offset 5 should have deliveryCount=1 from
first range)
+ ConsumerRecord<String, String> record5 = records.stream()
+ .filter(r -> r.offset() == 5L)
+ .findFirst()
+ .orElse(null);
+ assertNotNull(record5);
+ assertEquals(Optional.of((short) 1), record5.deliveryCount());
+
+ // Verify offset 10 has deliveryCount=2 from second range
+ ConsumerRecord<String, String> record10 = records.stream()
+ .filter(r -> r.offset() == 10L)
+ .findFirst()
+ .orElse(null);
+ assertNotNull(record10);
+ assertEquals(Optional.of((short) 2), record10.deliveryCount());
+
+ // Verify all offsets are unique
+ Set<Long> offsetSet = new HashSet<>();
+ for (ConsumerRecord<String, String> record : records) {
+ assertTrue(offsetSet.add(record.offset()),
+ "Duplicate offset found in results: " + record.offset());
+ }
+ }
+
private ShareCompletedFetch
newShareCompletedFetch(ShareFetchResponseData.PartitionData partitionData) {
LogContext logContext = new LogContext();
ShareFetchMetricsRegistry shareFetchMetricsRegistry = new
ShareFetchMetricsRegistry();