This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fcb15e3 KAFKA-6292; Improve FileLogInputStream batch position checks
to avoid type overflow (#4928)
fcb15e3 is described below
commit fcb15e357c1b818d2d543dc9db3e011ddc1fbf5e
Author: Roman Khlebnov <[email protected]>
AuthorDate: Wed May 9 03:07:50 2018 +0300
KAFKA-6292; Improve FileLogInputStream batch position checks to avoid type
overflow (#4928)
Switch from sum operations to subtraction to avoid type casting in checks
and type overflow during `FlieLogInputStream` work, especially in cases where
property `log.segment.bytes` was set close to the `Integer.MAX_VALUE` and used
as a `position` inside `nextBatch()` function.
Reviewers: Ismael Juma <[email protected]>, Jason Gustafson
<[email protected]>
---
.../kafka/common/record/FileLogInputStream.java | 4 ++--
.../kafka/common/record/FileLogInputStreamTest.java | 20 ++++++++++++++++++--
2 files changed, 20 insertions(+), 4 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index a1e3a2f..92e8864 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -60,7 +60,7 @@ public class FileLogInputStream implements
LogInputStream<FileLogInputStream.Fil
@Override
public FileChannelRecordBatch nextBatch() throws IOException {
FileChannel channel = fileRecords.channel();
- if (position + HEADER_SIZE_UP_TO_MAGIC >= end)
+ if (position >= end - HEADER_SIZE_UP_TO_MAGIC)
return null;
logHeaderBuffer.rewind();
@@ -75,7 +75,7 @@ public class FileLogInputStream implements
LogInputStream<FileLogInputStream.Fil
throw new CorruptRecordException(String.format("Found record size
%d smaller than minimum record " +
"overhead (%d) in file %s.", size,
LegacyRecord.RECORD_OVERHEAD_V0, fileRecords.file()));
- if (position + LOG_OVERHEAD + size > end)
+ if (position > end - LOG_OVERHEAD - size)
return null;
byte magic = logHeaderBuffer.get(MAGIC_OFFSET);
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
index 95b2a0c..77aaae8 100644
---
a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
@@ -112,8 +112,8 @@ public class FileLogInputStreamTest {
SimpleRecord[] firstBatchRecords = new SimpleRecord[]{
new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()),
new SimpleRecord(234280L, "b".getBytes(), "2".getBytes())
-
};
+
SimpleRecord[] secondBatchRecords = new SimpleRecord[]{
new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()),
new SimpleRecord(897839L, null, "4".getBytes()),
@@ -152,8 +152,8 @@ public class FileLogInputStreamTest {
SimpleRecord[] firstBatchRecords = new SimpleRecord[]{
new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()),
new SimpleRecord(234280L, "b".getBytes(), "2".getBytes())
-
};
+
SimpleRecord[] secondBatchRecords = new SimpleRecord[]{
new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()),
new SimpleRecord(897839L, null, "4".getBytes()),
@@ -204,6 +204,22 @@ public class FileLogInputStreamTest {
}
}
+ @Test
+ public void testNextBatchSelectionWithMaxedParams() throws IOException {
+ try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+ FileLogInputStream logInputStream = new
FileLogInputStream(fileRecords, Integer.MAX_VALUE, Integer.MAX_VALUE);
+ assertNull(logInputStream.nextBatch());
+ }
+ }
+
+ @Test
+ public void testNextBatchSelectionWithZeroedParams() throws IOException {
+ try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+ FileLogInputStream logInputStream = new
FileLogInputStream(fileRecords, 0, 0);
+ assertNull(logInputStream.nextBatch());
+ }
+ }
+
private void assertProducerData(RecordBatch batch, long producerId, short
producerEpoch, int baseSequence,
boolean isTransactional, SimpleRecord ...
records) {
assertEquals(producerId, batch.producerId());
--
To stop receiving notification emails like this one, please contact
[email protected].