[ 
https://issues.apache.org/jira/browse/KAFKA-13953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569354#comment-17569354
 ] 

Doguscan Namal commented on KAFKA-13953:
----------------------------------------

I tried to reproduce the issue by sending a negative length for a record on the 
produce request. I overrode the DefaultRecord.java with the following, to 
corrupt the fifth record in the middle of a batch.

 

However, Broker detected that record was not valid

`org.apache.kafka.common.InvalidRecordException: This record has failed the 
validation on broker and hence will be rejected.`

 

Code that I changed to reproduce:
```
public static int writeTo(DataOutputStream out,
                              int offsetDelta,
                              long timestampDelta,
                              ByteBuffer key,
                              ByteBuffer value,
                              Header[] headers) throws IOException {
        int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, 
value, headers);
        int sizeOfSize;
        if ( offsetDelta == 5 ) {
            ByteUtils.writeVarint(-155493822, out);
            sizeOfSize = ByteUtils.sizeOfVarint(-155493822);
        } else {
            ByteUtils.writeVarint(sizeInBytes, out);
            sizeOfSize = ByteUtils.sizeOfVarint(sizeInBytes);
        }
...
return sizeOfSize + sizeInBytes;
}
```

> kafka Console consumer fails with CorruptRecordException 
> ---------------------------------------------------------
>
>                 Key: KAFKA-13953
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13953
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, controller, core
>    Affects Versions: 2.7.0
>            Reporter: Aldan Brito
>            Priority: Blocker
>
> Kafka consumer fails with corrupt record exception. 
> {code:java}
> opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server *.*.*.*:<port> 
> --topic BQR-PULL-DEFAULT --from-beginning > 
> /opt/nokia/kafka-zookeeper-clustering/kafka/topic-data/tmpsdh/dumptest
> [{*}2022-05-15 18:34:15,146]{*} ERROR Error processing message, terminating 
> consumer process:  (kafka.tools.ConsoleConsumer$)
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from BQR-PULL-DEFAULT-30. If needed, please seek past the record 
> to continue consumption.
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1577)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1276)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>         at 
> kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:438)
>         at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104)
>         at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
>         at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55)
>         at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size 
> 0 is less than the minimum record overhead (14)
> Processed a total of 15765197 messages {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to