[ https://issues.apache.org/jira/browse/KAFKA-13953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17569354#comment-17569354 ]
Doguscan Namal commented on KAFKA-13953: ---------------------------------------- I tried to reproduce the issue by sending a negative length for a record on the produce request. I overrode the DefaultRecord.java with the following, to corrupt the fifth record in the middle of a batch. However, Broker detected that record was not valid `org.apache.kafka.common.InvalidRecordException: This record has failed the validation on broker and hence will be rejected.` Code that I changed to reproduce: ``` public static int writeTo(DataOutputStream out, int offsetDelta, long timestampDelta, ByteBuffer key, ByteBuffer value, Header[] headers) throws IOException { int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers); int sizeOfSize; if ( offsetDelta == 5 ) { ByteUtils.writeVarint(-155493822, out); sizeOfSize = ByteUtils.sizeOfVarint(-155493822); } else { ByteUtils.writeVarint(sizeInBytes, out); sizeOfSize = ByteUtils.sizeOfVarint(sizeInBytes); } ... return sizeOfSize + sizeInBytes; } ``` > kafka Console consumer fails with CorruptRecordException > --------------------------------------------------------- > > Key: KAFKA-13953 > URL: https://issues.apache.org/jira/browse/KAFKA-13953 > Project: Kafka > Issue Type: Bug > Components: consumer, controller, core > Affects Versions: 2.7.0 > Reporter: Aldan Brito > Priority: Blocker > > Kafka consumer fails with corrupt record exception. > {code:java} > opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server *.*.*.*:<port> > --topic BQR-PULL-DEFAULT --from-beginning > > /opt/nokia/kafka-zookeeper-clustering/kafka/topic-data/tmpsdh/dumptest > [{*}2022-05-15 18:34:15,146]{*} ERROR Error processing message, terminating > consumer process: (kafka.tools.ConsoleConsumer$) > org.apache.kafka.common.KafkaException: Received exception when fetching the > next record from BQR-PULL-DEFAULT-30. If needed, please seek past the record > to continue consumption. > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1577) > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1276) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) > at > kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:438) > at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104) > at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78) > at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55) > at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) > Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size > 0 is less than the minimum record overhead (14) > Processed a total of 15765197 messages {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)