This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 484ba83f596 KAFKA-18683: Handle slicing of file records for updated 
start position (#18759)
484ba83f596 is described below

commit 484ba83f5968285a1a93bd1b3632e26bf5c21fb1
Author: Apoorv Mittal <[email protected]>
AuthorDate: Fri Jan 31 23:43:51 2025 +0000

    KAFKA-18683: Handle slicing of file records for updated start position 
(#18759)
    
    The PR corrects the check which was introduced in #5332 where position is 
checked to be within boundaries of file. The check
        position > currentSizeInBytes - start
    is incorrect, since the position is relative to start.
    
    Reviewers: Jun Rao <[email protected]>
---
 .../apache/kafka/common/record/FileRecords.java    |  4 ++-
 .../kafka/common/record/FileRecordsTest.java       | 34 ++++++++++++++++++++++
 2 files changed, 37 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 64dd73de412..2aa940b8078 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -161,7 +161,9 @@ public class FileRecords extends AbstractRecords implements 
Closeable {
 
         if (position < 0)
             throw new IllegalArgumentException("Invalid position: " + position 
+ " in read from " + this);
-        if (position > currentSizeInBytes - start)
+        // position should always be relative to the start of the file hence 
compare with file size
+        // to verify if the position is within the file.
+        if (position > currentSizeInBytes)
             throw new IllegalArgumentException("Slice from position " + 
position + " exceeds end position of " + this);
         if (size < 0)
             throw new IllegalArgumentException("Invalid size: " + size + " in 
read from " + this);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 4461108713c..c608fd7dd9e 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -44,6 +44,7 @@ import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.stream.IntStream;
 
 import static java.util.Arrays.asList;
 import static org.apache.kafka.common.utils.Utils.utf8;
@@ -433,6 +434,39 @@ public class FileRecordsTest {
         }
     }
 
+    /**
+     * Test slice when already sliced file records have start position greater 
than available bytes
+     * in the file records.
+     */
+    @Test
+    public void testSliceForAlreadySlicedFileRecords() throws IOException {
+        byte[][] values = new byte[][] {
+            "abcd".getBytes(),
+            "efgh".getBytes(),
+            "ijkl".getBytes(),
+            "mnop".getBytes(),
+            "qrst".getBytes()
+        };
+        try (FileRecords fileRecords = createFileRecords(values)) {
+            List<RecordBatch> items = batches(fileRecords.slice(0, 
fileRecords.sizeInBytes()));
+
+            // Slice from fourth message until the end.
+            int position = IntStream.range(0, 3).map(i -> 
items.get(i).sizeInBytes()).sum();
+            FileRecords sliced  = fileRecords.slice(position, 
fileRecords.sizeInBytes() - position);
+            assertEquals(fileRecords.sizeInBytes() - position, 
sliced.sizeInBytes());
+            assertEquals(items.subList(3, items.size()), batches(sliced), 
"Read starting from the fourth message");
+
+            // Further slice the already sliced file records, from fifth 
message until the end. Now the
+            // bytes available in the sliced file records are less than the 
start position. However, the
+            // position to slice is relative hence reset position to second 
message in the sliced file
+            // records i.e. reset with the size of the fourth message from the 
original file records.
+            position = items.get(4).sizeInBytes();
+            FileRecords finalSliced = sliced.slice(position, 
sliced.sizeInBytes() - position);
+            assertEquals(sliced.sizeInBytes() - position, 
finalSliced.sizeInBytes());
+            assertEquals(items.subList(4, items.size()), batches(finalSliced), 
"Read starting from the fifth message");
+        }
+    }
+
     private void testSearchForTimestamp(RecordVersion version) throws 
IOException {
         File temp = tempFile();
         FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, 
true);

Reply via email to