This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 484ba83f596 KAFKA-18683: Handle slicing of file records for updated
start position (#18759)
484ba83f596 is described below
commit 484ba83f5968285a1a93bd1b3632e26bf5c21fb1
Author: Apoorv Mittal <[email protected]>
AuthorDate: Fri Jan 31 23:43:51 2025 +0000
KAFKA-18683: Handle slicing of file records for updated start position
(#18759)
The PR corrects the check which was introduced in #5332 where position is
checked to be within boundaries of file. The check
position > currentSizeInBytes - start
is incorrect, since the position is relative to start.
Reviewers: Jun Rao <[email protected]>
---
.../apache/kafka/common/record/FileRecords.java | 4 ++-
.../kafka/common/record/FileRecordsTest.java | 34 ++++++++++++++++++++++
2 files changed, 37 insertions(+), 1 deletion(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 64dd73de412..2aa940b8078 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -161,7 +161,9 @@ public class FileRecords extends AbstractRecords implements
Closeable {
if (position < 0)
throw new IllegalArgumentException("Invalid position: " + position
+ " in read from " + this);
- if (position > currentSizeInBytes - start)
+ // position should always be relative to the start of the file hence
compare with file size
+ // to verify if the position is within the file.
+ if (position > currentSizeInBytes)
throw new IllegalArgumentException("Slice from position " +
position + " exceeds end position of " + this);
if (size < 0)
throw new IllegalArgumentException("Invalid size: " + size + " in
read from " + this);
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 4461108713c..c608fd7dd9e 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -44,6 +44,7 @@ import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.stream.IntStream;
import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.utf8;
@@ -433,6 +434,39 @@ public class FileRecordsTest {
}
}
+ /**
+ * Test slice when already sliced file records have start position greater
than available bytes
+ * in the file records.
+ */
+ @Test
+ public void testSliceForAlreadySlicedFileRecords() throws IOException {
+ byte[][] values = new byte[][] {
+ "abcd".getBytes(),
+ "efgh".getBytes(),
+ "ijkl".getBytes(),
+ "mnop".getBytes(),
+ "qrst".getBytes()
+ };
+ try (FileRecords fileRecords = createFileRecords(values)) {
+ List<RecordBatch> items = batches(fileRecords.slice(0,
fileRecords.sizeInBytes()));
+
+ // Slice from fourth message until the end.
+ int position = IntStream.range(0, 3).map(i ->
items.get(i).sizeInBytes()).sum();
+ FileRecords sliced = fileRecords.slice(position,
fileRecords.sizeInBytes() - position);
+ assertEquals(fileRecords.sizeInBytes() - position,
sliced.sizeInBytes());
+ assertEquals(items.subList(3, items.size()), batches(sliced),
"Read starting from the fourth message");
+
+ // Further slice the already sliced file records, from fifth
message until the end. Now the
+ // bytes available in the sliced file records are less than the
start position. However, the
+ // position to slice is relative hence reset position to second
message in the sliced file
+ // records i.e. reset with the size of the fourth message from the
original file records.
+ position = items.get(4).sizeInBytes();
+ FileRecords finalSliced = sliced.slice(position,
sliced.sizeInBytes() - position);
+ assertEquals(sliced.sizeInBytes() - position,
finalSliced.sizeInBytes());
+ assertEquals(items.subList(4, items.size()), batches(finalSliced),
"Read starting from the fifth message");
+ }
+ }
+
private void testSearchForTimestamp(RecordVersion version) throws
IOException {
File temp = tempFile();
FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024,
true);