This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d6728d4b29a KAFKA-19789: Log an error when we get duplicate acquired 
offsets in ShareFetchResponse. (#20752)
d6728d4b29a is described below

commit d6728d4b29ae28b4016d5b8568b8971e7892f669
Author: Shivsundar R <[email protected]>
AuthorDate: Tue Oct 28 20:44:30 2025 -0400

    KAFKA-19789: Log an error when we get duplicate acquired offsets in 
ShareFetchResponse. (#20752)
    
    *What*
    https://issues.apache.org/jira/browse/KAFKA-19789
    
    - There were some scenarios where `ShareFetchResponse` contained
    duplicate acquired records, this was a broker side bug.
    - Although ideally this should not happen, the client was not expecting
    this case and acknowledged with `GAP` type for any duplicate occurrence.
    - This case should be logged as an error in the client, and we must not
    acknowledge the duplicate offsets as the broker is already in a bad
    state.
    - PR adds an error log for this case and a unit test for the same.
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../consumer/internals/ShareCompletedFetch.java    | 19 +++++--
 .../internals/ShareCompletedFetchTest.java         | 58 ++++++++++++++++++++++
 2 files changed, 74 insertions(+), 3 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
index 2c337782dd4..95e40a3c826 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
@@ -41,9 +41,9 @@ import org.slf4j.Logger;
 
 import java.io.Closeable;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Optional;
@@ -99,10 +99,23 @@ public class ShareCompletedFetch {
     }
 
     private List<OffsetAndDeliveryCount> 
buildAcquiredRecordList(List<ShareFetchResponseData.AcquiredRecords> 
partitionAcquiredRecords) {
-        List<OffsetAndDeliveryCount> acquiredRecordList = new LinkedList<>();
+        // Setting the size of the array to the size of the first batch of 
acquired records. In case there is only 1 batch acquired, resizing would not 
happen.
+        if (partitionAcquiredRecords.isEmpty()) {
+            return List.of();
+        }
+        int initialListSize = (int) 
(partitionAcquiredRecords.get(0).lastOffset() - 
partitionAcquiredRecords.get(0).firstOffset() + 1);
+        List<OffsetAndDeliveryCount> acquiredRecordList = new 
ArrayList<>(initialListSize);
+
+        // Set to find duplicates in case of overlapping acquired records
+        Set<Long> offsets = new HashSet<>();
         partitionAcquiredRecords.forEach(acquiredRecords -> {
             for (long offset = acquiredRecords.firstOffset(); offset <= 
acquiredRecords.lastOffset(); offset++) {
-                acquiredRecordList.add(new OffsetAndDeliveryCount(offset, 
acquiredRecords.deliveryCount()));
+                if (!offsets.add(offset)) {
+                    log.error("Duplicate acquired record offset {} found in 
share fetch response for partition {}. " +
+                            "This indicates a broker processing issue.", 
offset, partition.topicPartition());
+                } else {
+                    acquiredRecordList.add(new OffsetAndDeliveryCount(offset, 
acquiredRecords.deliveryCount()));
+                }
             }
         });
         return acquiredRecordList;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
index a1814fd935c..95f16966292 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
@@ -60,6 +60,7 @@ import static 
org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ShareCompletedFetchTest {
     private static final String TOPIC_NAME = "test";
@@ -356,6 +357,63 @@ public class ShareCompletedFetchTest {
         assertEquals(0, records.size());
     }
 
+    @Test
+    public void 
testOverlappingAcquiredRecordsLogsErrorAndRetainsFirstOccurrence() {
+        int startingOffset = 0;
+        int numRecords = 20;        // Records for 0-19
+
+        // Create overlapping acquired records: [0-9] and [5-14]
+        // Offsets 5-9 will be duplicates
+        List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new 
ArrayList<>();
+        acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0L)
+                .setLastOffset(9L)
+                .setDeliveryCount((short) 1));
+        acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(5L)
+                .setLastOffset(14L)
+                .setDeliveryCount((short) 2));
+
+        ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
+                .setRecords(newRecords(startingOffset, numRecords))
+                .setAcquiredRecords(acquiredRecords);
+
+        ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
+
+        Deserializers<String, String> deserializers = newStringDeserializers();
+
+        // Fetch records and verify that only 15 unique records are returned 
(0-14)
+        ShareInFlightBatch<String, String> batch = 
completedFetch.fetchRecords(deserializers, 20, true);
+        List<ConsumerRecord<String, String>> records = 
batch.getInFlightRecords();
+        
+        // Should get 15 unique records: 0-9 from first range (with 
deliveryCount=1)
+        // and 10-14 from second range (with deliveryCount=2)
+        assertEquals(15, records.size());
+        
+        // Verify first occurrence (offset 5 should have deliveryCount=1 from 
first range)
+        ConsumerRecord<String, String> record5 = records.stream()
+                .filter(r -> r.offset() == 5L)
+                .findFirst()
+                .orElse(null);
+        assertNotNull(record5);
+        assertEquals(Optional.of((short) 1), record5.deliveryCount());
+        
+        // Verify offset 10 has deliveryCount=2 from second range
+        ConsumerRecord<String, String> record10 = records.stream()
+                .filter(r -> r.offset() == 10L)
+                .findFirst()
+                .orElse(null);
+        assertNotNull(record10);
+        assertEquals(Optional.of((short) 2), record10.deliveryCount());
+        
+        // Verify all offsets are unique
+        Set<Long> offsetSet = new HashSet<>();
+        for (ConsumerRecord<String, String> record : records) {
+            assertTrue(offsetSet.add(record.offset()), 
+                    "Duplicate offset found in results: " + record.offset());
+        }
+    }
+
     private ShareCompletedFetch 
newShareCompletedFetch(ShareFetchResponseData.PartitionData partitionData) {
         LogContext logContext = new LogContext();
         ShareFetchMetricsRegistry shareFetchMetricsRegistry = new 
ShareFetchMetricsRegistry();

Reply via email to