This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new eac193056ac MINOR: Add more unit tests to test GAP Acknowledgements in
ShareCompletedFetch (#21044)
eac193056ac is described below
commit eac193056ac9bd0448eaaf8b1a064fd8a6823554
Author: Shivsundar R <[email protected]>
AuthorDate: Tue Dec 2 13:36:38 2025 -0500
MINOR: Add more unit tests to test GAP Acknowledgements in
ShareCompletedFetch (#21044)
*What*
PR adds unit tests to `ShareCompletedFetch` which tests if **GAP**
acknowledgements are correctly handled in
`ShareCompletedFetch::fetchRecords()`.
Reviewers: Andrew Schofield <[email protected]>
---
.../internals/ShareCompletedFetchTest.java | 147 +++++++++++++++++++++
1 file changed, 147 insertions(+)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
index c219b566992..3e6ccd8a7ba 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
@@ -338,6 +338,153 @@ public class ShareCompletedFetchTest {
assertEquals(0, records.size());
}
+ @Test
+ public void testGapsForControlRecordsInAcquiredRange() {
+ int numRecords = 10;
+ // Create records with transaction markers (control records)
+ Records rawRecords = newTransactionalRecords(numRecords);
+
+ // Acquire all records including the control record (offset 10 is the
commit marker)
+ ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
+ .setRecords(rawRecords)
+ .setAcquiredRecords(acquiredRecords(0L, numRecords + 1));
+
+ ShareCompletedFetch completedFetch =
newShareCompletedFetch(partitionData);
+ try (final Deserializers<String, String> deserializers =
newStringDeserializers()) {
+ ShareInFlightBatch<String, String> batch =
completedFetch.fetchRecords(deserializers, 15, true);
+ List<ConsumerRecord<String, String>> records =
batch.getInFlightRecords();
+
+ // Should get 10 actual records (control records are filtered out)
+ assertEquals(10, records.size());
+
+ // Should have 1 gap for the control record at offset 10
+ Acknowledgements acknowledgements = batch.getAcknowledgements();
+ assertEquals(1, acknowledgements.size());
+ assertNull(acknowledgements.get(10L), "Offset 10 (control record)
should be a GAP (null)");
+ }
+ }
+
+ @Test
+ public void testMixedRecordsAndGaps() {
+ int startingOffset = 0;
+
+ // Acquire records 0-4 (exist), 10-14 (don't exist = gaps)
+ List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new
ArrayList<>();
+ acquiredRecords.add(acquiredRecords(0L, 5).get(0));
+ acquiredRecords.add(acquiredRecords(10L, 5).get(0));
+
+ ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
+ .setRecords(newRecords(startingOffset, 10))
+ .setAcquiredRecords(acquiredRecords); // Acquire only records
0-4 and 10-14
+
+ Deserializers<String, String> deserializers = newStringDeserializers();
+
+ ShareCompletedFetch completedFetch =
newShareCompletedFetch(partitionData);
+
+ ShareInFlightBatch<String, String> batch =
completedFetch.fetchRecords(deserializers, 20, true);
+ List<ConsumerRecord<String, String>> records =
batch.getInFlightRecords();
+
+ // Should get 5 actual records (0-4)
+ assertEquals(5, records.size());
+ for (int i = 0; i < 5; i++) {
+ assertEquals(i, records.get(i).offset());
+ }
+
+ // Should have 5 gaps (10-14) in acknowledgements
+ Acknowledgements acknowledgements = batch.getAcknowledgements();
+ assertEquals(5, acknowledgements.size());
+
+ // Verify GAP acknowledgements for offsets 10-14
+ for (long offset = 10L; offset <= 14L; offset++) {
+ assertNull(acknowledgements.get(offset), "Offset " + offset + "
should be a GAP (null)");
+ }
+ }
+
+ @Test
+ public void testAcknowledgementsIncludeOnlyGaps() {
+ int startingOffset = 0;
+ int numRecords = 10; // Records for 0-9
+
+ // Acquire only non-existent records 15-19 (all should be gaps)
+ ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
+ .setRecords(newRecords(startingOffset, numRecords)) //
Records 0-9
+ .setAcquiredRecords(acquiredRecords(15L, 5)); // Acquire
15-19 (don't exist)
+
+ Deserializers<String, String> deserializers = newStringDeserializers();
+
+ ShareCompletedFetch completedFetch =
newShareCompletedFetch(partitionData);
+
+ ShareInFlightBatch<String, String> batch =
completedFetch.fetchRecords(deserializers, 20, true);
+ List<ConsumerRecord<String, String>> records =
batch.getInFlightRecords();
+
+ // Should get no actual records
+ assertEquals(0, records.size());
+
+ // Should have 5 gaps (15-19) in acknowledgements
+ Acknowledgements acknowledgements = batch.getAcknowledgements();
+ assertEquals(5, acknowledgements.size());
+
+ // Verify all are GAP acknowledgements
+ for (long offset = 15L; offset <= 19L; offset++) {
+ assertNull(acknowledgements.get(offset), "Offset " + offset + "
should be a GAP (null)");
+ }
+ }
+
+ @Test
+ public void testGapsWithControlRecordsAtBeginningAndEnd() {
+ // Create transactional records: control record, data records 1-5,
control record at 6
+ Time time = new MockTime();
+ ByteBuffer buffer = ByteBuffer.allocate(2048);
+
+ // Write first control record (commit marker at offset 0)
+ writeTransactionMarker(buffer, 0, time);
+
+ // Write data records 1-5
+ try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
+ RecordBatch.CURRENT_MAGIC_VALUE,
+ Compression.NONE,
+ TimestampType.CREATE_TIME,
+ 1,
+ time.milliseconds(),
+ PRODUCER_ID,
+ PRODUCER_EPOCH,
+ 0,
+ true,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH)) {
+ for (int i = 0; i < 5; i++)
+ builder.append(new SimpleRecord(time.milliseconds(),
"key".getBytes(), "value".getBytes()));
+ builder.build();
+ }
+
+ // Write second control record (commit marker at offset 6)
+ writeTransactionMarker(buffer, 6, time);
+
+ buffer.flip();
+ Records records = MemoryRecords.readableRecords(buffer);
+
+ // Acquire all offsets 0-6 (includes both control records and data
records)
+ ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
+ .setRecords(records)
+ .setAcquiredRecords(acquiredRecords(0L, 7));
+
+ ShareCompletedFetch completedFetch =
newShareCompletedFetch(partitionData);
+ try (final Deserializers<String, String> deserializers =
newStringDeserializers()) {
+ ShareInFlightBatch<String, String> batch =
completedFetch.fetchRecords(deserializers, 10, true);
+ List<ConsumerRecord<String, String>> fetchedRecords =
batch.getInFlightRecords();
+
+ // Should get 5 data records (1-5)
+ assertEquals(5, fetchedRecords.size());
+ assertEquals(1L, fetchedRecords.get(0).offset());
+ assertEquals(5L, fetchedRecords.get(4).offset());
+
+ // Should have 2 gaps for the control records (offsets 0 and 6)
+ Acknowledgements acknowledgements = batch.getAcknowledgements();
+ assertEquals(2, acknowledgements.size());
+ assertNull(acknowledgements.get(0L), "Offset 0 (control record)
should be a GAP (null)");
+ assertNull(acknowledgements.get(6L), "Offset 6 (control record)
should be a GAP (null)");
+ }
+ }
+
@Test
public void testAcquireOddRecords() {
int startingOffset = 0;