This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new e89fa449 KAFKA-13149; Fix NPE when handling malformed record data in
produce requests (#11080)
e89fa449 is described below
commit e89fa449dbbba80bb9e3adf121439361b137ae8f
Author: Cong Ding <[email protected]>
AuthorDate: Tue Sep 14 11:47:54 2021 -0500
KAFKA-13149; Fix NPE when handling malformed record data in produce
requests (#11080)
Raise `InvalidRecordException` from `DefaultRecordBatch.readFrom` instead
of returning null if there are not enough bytes remaining to read the record.
This ensures that the broker can raise a useful exception for malformed record
batches.
Reviewers: Ismael Juma <[email protected]>, Jason Gustafson
<[email protected]>
---
.../java/org/apache/kafka/common/record/DefaultRecord.java | 4 +++-
.../java/org/apache/kafka/common/record/DefaultRecordTest.java | 10 ++++++++++
2 files changed, 13 insertions(+), 1 deletion(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index b63773b..8772556 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -293,7 +293,9 @@ public class DefaultRecord implements Record {
Long logAppendTime) {
int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
if (buffer.remaining() < sizeOfBodyInBytes)
- return null;
+ throw new InvalidRecordException("Invalid record size: expected "
+ sizeOfBodyInBytes +
+ " bytes in record payload, but instead the buffer has only " +
buffer.remaining() +
+ " remaining bytes.");
int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) +
sizeOfBodyInBytes;
return readFrom(buffer, totalSizeInBytes, sizeOfBodyInBytes,
baseOffset, baseTimestamp,
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
index af154d3..49743d2 100644
---
a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -481,4 +481,14 @@ public class DefaultRecordTest {
assertEquals(RecordBatch.NO_SEQUENCE, record.sequence());
}
+ @Test
+ public void testInvalidSizeOfBodyInBytes() {
+ int sizeOfBodyInBytes = 10;
+ ByteBuffer buf = ByteBuffer.allocate(5);
+ ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
+
+ buf.flip();
+ assertThrows(InvalidRecordException.class,
+ () -> DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE,
null));
+ }
}