Sam Meder created KAFKA-994:
-------------------------------
Summary: High level consumer doesn't throw an exception when the
message it is trying to fetch exceeds the configured fetch size
Key: KAFKA-994
URL: https://issues.apache.org/jira/browse/KAFKA-994
Project: Kafka
Issue Type: Bug
Components: consumer
Affects Versions: 0.8
Reporter: Sam Meder
Assignee: Neha Narkhede
The high level consumer code is supposed to throw an exception when it
encounters a message that exceeds its configured max message size. The relevant
code form ConsumerIterator.scala is:
// if we just updated the current chunk and it is empty that means the
fetch size is too small!
if(currentDataChunk.messages.validBytes == 0)
throw new MessageSizeTooLargeException("Found a message larger than the
maximum fetch size of this consumer on topic " +
"%s partition %d at fetch offset
%d. Increase the fetch size, or decrease the maximum message size the broker
will allow."
.format(currentDataChunk.topicInfo.topic,
currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))
}
The problem is that KAFKA-846 changed PartitionTopicInfo.enqueue:
def enqueue(messages: ByteBufferMessageSet) {
- val size = messages.sizeInBytes
+ val size = messages.validBytes
if(size > 0) {
i.e. chunks that contain messages that are too big (validBytes = 0) will never
even be enqueued, so won't ever hit the too-large message check in
ConsumerIterator...
I think that just changing "if(size > 0) {" to if(messages.sizeInBytes > 0) {"
should do the trick?
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira