This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new e89fa449 KAFKA-13149; Fix NPE when handling malformed record data in 
produce requests (#11080)
e89fa449 is described below

commit e89fa449dbbba80bb9e3adf121439361b137ae8f
Author: Cong Ding <[email protected]>
AuthorDate: Tue Sep 14 11:47:54 2021 -0500

    KAFKA-13149; Fix NPE when handling malformed record data in produce 
requests (#11080)
    
    Raise `InvalidRecordException` from `DefaultRecordBatch.readFrom` instead 
of returning null if there are not enough bytes remaining to read the record. 
This ensures that the broker can raise a useful exception for malformed record 
batches.
    
    Reviewers: Ismael Juma <[email protected]>, Jason Gustafson 
<[email protected]>
---
 .../java/org/apache/kafka/common/record/DefaultRecord.java     |  4 +++-
 .../java/org/apache/kafka/common/record/DefaultRecordTest.java | 10 ++++++++++
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java 
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index b63773b..8772556 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -293,7 +293,9 @@ public class DefaultRecord implements Record {
                                          Long logAppendTime) {
         int sizeOfBodyInBytes = ByteUtils.readVarint(buffer);
         if (buffer.remaining() < sizeOfBodyInBytes)
-            return null;
+            throw new InvalidRecordException("Invalid record size: expected " 
+ sizeOfBodyInBytes +
+                " bytes in record payload, but instead the buffer has only " + 
buffer.remaining() +
+                " remaining bytes.");
 
         int totalSizeInBytes = ByteUtils.sizeOfVarint(sizeOfBodyInBytes) + 
sizeOfBodyInBytes;
         return readFrom(buffer, totalSizeInBytes, sizeOfBodyInBytes, 
baseOffset, baseTimestamp,
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
index af154d3..49743d2 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -481,4 +481,14 @@ public class DefaultRecordTest {
         assertEquals(RecordBatch.NO_SEQUENCE, record.sequence());
     }
 
+    @Test
+    public void testInvalidSizeOfBodyInBytes() {
+        int sizeOfBodyInBytes = 10;
+        ByteBuffer buf = ByteBuffer.allocate(5);
+        ByteUtils.writeVarint(sizeOfBodyInBytes, buf);
+
+        buf.flip();
+        assertThrows(InvalidRecordException.class,
+            () -> DefaultRecord.readFrom(buf, 0L, 0L, RecordBatch.NO_SEQUENCE, 
null));
+    }
 }

Reply via email to