[ 
https://issues.apache.org/jira/browse/KAFKA-13953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17564575#comment-17564575
 ] 

Doguscan Namal commented on KAFKA-13953:
----------------------------------------

I put the part of the data that I was able to read. Changing the leadership of 
the partition did not help, so I assume that all of them are corrupted. (data 
retention is expired now, so unfortunately I could not verify it)
 * It did not corrupted in the RecordBatch but rather in the record level. I 
could read all of the data in that batch up to the problematic offset.
 * It fails in the following line 
[https://github.com/apache/kafka/blob/2.5.1/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L296-L297]
 . sizeOfBodyInBytes is read as -155493822, which should have been 1156.
 * I overrode that value for this offset and it was able to read the record 
data up to its 3rd header among its 5 headers, up to here 
[https://github.com/apache/kafka/blob/2.5.1/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L545]
 
 * It failed because headerValueSize was read as 1991988702, which should have 
been 13. I realized that `ByteUtils.readVarint` reading 5 bytes of data instead 
of 1.
 * Even overriding this headerValueSize did not help after this point. On next 
read headerKeySize was read as -58 and processing failed.

This makes me think that Kafka only has the partial record for this offset. And 
although there are more records in this batch after this point none of them are 
accessible because this particular offset is corrupted.

Q1) Could it be possible for producer to send a corrupted batch? Due to 
zero-copy maybe it just copied the received content to the data log file?

Q2) I also see `ProduceMessageConversionsPerSec` metric for this topic. Could 
it be related to it i.e. message conversion?
 * Topic is configured to use zstd compression
 * Kafka version 2.5.1

Here is a data from my println when this record is being read:

recordStart:0 attributes: 0 timestampDelta: 391 timestamp 1656027641475 offset: 
88062375700 sequence:40017233 key: java.nio.HeapByteBuffer[pos=0 lim=25 
cap=1199] value: java.nio.HeapByteBuffer[pos=0 lim=961 cap=1149] numHeaders: 5

headerValueSize: 12 capacity: 147
headerValueSize: 8 capacity: 122
headerValueSize: 1991988702 capacity: 90

> kafka Console consumer fails with CorruptRecordException 
> ---------------------------------------------------------
>
>                 Key: KAFKA-13953
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13953
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, controller, core
>    Affects Versions: 2.7.0
>            Reporter: Aldan Brito
>            Priority: Blocker
>
> Kafka consumer fails with corrupt record exception. 
> {code:java}
> opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server *.*.*.*:<port> 
> --topic BQR-PULL-DEFAULT --from-beginning > 
> /opt/nokia/kafka-zookeeper-clustering/kafka/topic-data/tmpsdh/dumptest
> [{*}2022-05-15 18:34:15,146]{*} ERROR Error processing message, terminating 
> consumer process:  (kafka.tools.ConsoleConsumer$)
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from BQR-PULL-DEFAULT-30. If needed, please seek past the record 
> to continue consumption.
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1577)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684)
>         at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1276)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
>         at 
> kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:438)
>         at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104)
>         at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78)
>         at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55)
>         at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size 
> 0 is less than the minimum record overhead (14)
> Processed a total of 15765197 messages {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to