This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eac193056ac MINOR: Add more unit tests to test GAP Acknowledgements in 
ShareCompletedFetch (#21044)
eac193056ac is described below

commit eac193056ac9bd0448eaaf8b1a064fd8a6823554
Author: Shivsundar R <[email protected]>
AuthorDate: Tue Dec 2 13:36:38 2025 -0500

    MINOR: Add more unit tests to test GAP Acknowledgements in 
ShareCompletedFetch (#21044)
    
    *What*
    PR adds unit tests to `ShareCompletedFetch` which tests if **GAP**
    acknowledgements are correctly handled in
    `ShareCompletedFetch::fetchRecords()`.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../internals/ShareCompletedFetchTest.java         | 147 +++++++++++++++++++++
 1 file changed, 147 insertions(+)

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
index c219b566992..3e6ccd8a7ba 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
@@ -338,6 +338,153 @@ public class ShareCompletedFetchTest {
         assertEquals(0, records.size());
     }
 
+    @Test
+    public void testGapsForControlRecordsInAcquiredRange() {
+        int numRecords = 10;
+        // Create records with transaction markers (control records)
+        Records rawRecords = newTransactionalRecords(numRecords);
+
+        // Acquire all records including the control record (offset 10 is the 
commit marker)
+        ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
+                .setRecords(rawRecords)
+                .setAcquiredRecords(acquiredRecords(0L, numRecords + 1));
+
+        ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
+        try (final Deserializers<String, String> deserializers = 
newStringDeserializers()) {
+            ShareInFlightBatch<String, String> batch = 
completedFetch.fetchRecords(deserializers, 15, true);
+            List<ConsumerRecord<String, String>> records = 
batch.getInFlightRecords();
+
+            // Should get 10 actual records (control records are filtered out)
+            assertEquals(10, records.size());
+
+            // Should have 1 gap for the control record at offset 10
+            Acknowledgements acknowledgements = batch.getAcknowledgements();
+            assertEquals(1, acknowledgements.size());
+            assertNull(acknowledgements.get(10L), "Offset 10 (control record) 
should be a GAP (null)");
+        }
+    }
+
+    @Test
+    public void testMixedRecordsAndGaps() {
+        int startingOffset = 0;
+
+        // Acquire records 0-4 (exist), 10-14 (don't exist = gaps)
+        List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new 
ArrayList<>();
+        acquiredRecords.add(acquiredRecords(0L, 5).get(0));
+        acquiredRecords.add(acquiredRecords(10L, 5).get(0));
+
+        ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
+                .setRecords(newRecords(startingOffset,  10))
+                .setAcquiredRecords(acquiredRecords); // Acquire only records 
0-4 and 10-14
+
+        Deserializers<String, String> deserializers = newStringDeserializers();
+
+        ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
+
+        ShareInFlightBatch<String, String> batch = 
completedFetch.fetchRecords(deserializers, 20, true);
+        List<ConsumerRecord<String, String>> records = 
batch.getInFlightRecords();
+
+        // Should get 5 actual records (0-4)
+        assertEquals(5, records.size());
+        for (int i = 0; i < 5; i++) {
+            assertEquals(i, records.get(i).offset());
+        }
+
+        // Should have 5 gaps (10-14) in acknowledgements
+        Acknowledgements acknowledgements = batch.getAcknowledgements();
+        assertEquals(5, acknowledgements.size());
+
+        // Verify GAP acknowledgements for offsets 10-14
+        for (long offset = 10L; offset <= 14L; offset++) {
+            assertNull(acknowledgements.get(offset), "Offset " + offset + " 
should be a GAP (null)");
+        }
+    }
+
+    @Test
+    public void testAcknowledgementsIncludeOnlyGaps() {
+        int startingOffset = 0;
+        int numRecords = 10;        // Records for 0-9
+
+        // Acquire only non-existent records 15-19 (all should be gaps)
+        ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
+                .setRecords(newRecords(startingOffset, numRecords))  // 
Records 0-9
+                .setAcquiredRecords(acquiredRecords(15L, 5));       // Acquire 
15-19 (don't exist)
+
+        Deserializers<String, String> deserializers = newStringDeserializers();
+
+        ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
+
+        ShareInFlightBatch<String, String> batch = 
completedFetch.fetchRecords(deserializers, 20, true);
+        List<ConsumerRecord<String, String>> records = 
batch.getInFlightRecords();
+
+        // Should get no actual records
+        assertEquals(0, records.size());
+
+        // Should have 5 gaps (15-19) in acknowledgements
+        Acknowledgements acknowledgements = batch.getAcknowledgements();
+        assertEquals(5, acknowledgements.size());
+
+        // Verify all are GAP acknowledgements
+        for (long offset = 15L; offset <= 19L; offset++) {
+            assertNull(acknowledgements.get(offset), "Offset " + offset + " 
should be a GAP (null)");
+        }
+    }
+
+    @Test
+    public void testGapsWithControlRecordsAtBeginningAndEnd() {
+        // Create transactional records: control record, data records 1-5, 
control record at 6
+        Time time = new MockTime();
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+
+        // Write first control record (commit marker at offset 0)
+        writeTransactionMarker(buffer, 0, time);
+
+        // Write data records 1-5
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
+                RecordBatch.CURRENT_MAGIC_VALUE,
+                Compression.NONE,
+                TimestampType.CREATE_TIME,
+                1,
+                time.milliseconds(),
+                PRODUCER_ID,
+                PRODUCER_EPOCH,
+                0,
+                true,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH)) {
+            for (int i = 0; i < 5; i++)
+                builder.append(new SimpleRecord(time.milliseconds(), 
"key".getBytes(), "value".getBytes()));
+            builder.build();
+        }
+
+        // Write second control record (commit marker at offset 6)
+        writeTransactionMarker(buffer, 6, time);
+
+        buffer.flip();
+        Records records = MemoryRecords.readableRecords(buffer);
+
+        // Acquire all offsets 0-6 (includes both control records and data 
records)
+        ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
+                .setRecords(records)
+                .setAcquiredRecords(acquiredRecords(0L, 7));
+
+        ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
+        try (final Deserializers<String, String> deserializers = 
newStringDeserializers()) {
+            ShareInFlightBatch<String, String> batch = 
completedFetch.fetchRecords(deserializers, 10, true);
+            List<ConsumerRecord<String, String>> fetchedRecords = 
batch.getInFlightRecords();
+
+            // Should get 5 data records (1-5)
+            assertEquals(5, fetchedRecords.size());
+            assertEquals(1L, fetchedRecords.get(0).offset());
+            assertEquals(5L, fetchedRecords.get(4).offset());
+
+            // Should have 2 gaps for the control records (offsets 0 and 6)
+            Acknowledgements acknowledgements = batch.getAcknowledgements();
+            assertEquals(2, acknowledgements.size());
+            assertNull(acknowledgements.get(0L), "Offset 0 (control record) 
should be a GAP (null)");
+            assertNull(acknowledgements.get(6L), "Offset 6 (control record) 
should be a GAP (null)");
+        }
+    }
+
     @Test
     public void testAcquireOddRecords() {
         int startingOffset = 0;

Reply via email to