[ https://issues.apache.org/jira/browse/KAFKA-1196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14173943#comment-14173943 ]
Ewen Cheslack-Postava commented on KAFKA-1196: ---------------------------------------------- This is a wip patch to fix this issue, which previous discussion suggests was due to the FetchResponse exceeding 2GB. My approach to triggering the issue, however, doesn't exhibit exactly the same issue but does cause an unrecoverable error that causes the consumer connection to terminate. (For reference, it causes the server to fail when FetchResponseSend.writeTo calls expectIncomplete and sendSize is negative due to overflow. This confuses the server since it looks like the message is already done sending and the server forcibly closes the consumer's connection.) The patch addresses the core issue by ensuring the returned message doesn't exceed 2GB by dropping parts of it in a way that otherwise shouldn't affect the consumer. But there are a lot of points that still need to be addressed: * I started by building an integration test to trigger the issue, included in PrimitiveApiTest. However, since we necessarily need to have > 2GB data to trigger the issue, it's probably too expensive to include in this way. Offline discussion suggests maybe a system test would be a better place to include this. It's still included here for completeness. * The implementation filters to a subset of the data in FetchResponse. The main reason for this is that this process needs to know the exact (or at least conservative estimate) size of serialized data, which only FetchResponse knows. But it's also a bit weird compared to other message classes, which are case classes and don't modify those inputs. * Algorithm for choosing subset to return: initial approach is to remove random elements until we get below the limit. This is simple to understand and avoids starvation of specific TopicAndPartitions. Any concerns with this basic approach? * I'm pretty sure I've managed to keep the < 2GB case to effectively the same computational cost (computing the serialized size, grouped data, etc. exactly once as before). However, for the > 2GB case I've only ensured correctness. In particular, the progressive removal and reevaluation of serialized size could potentially be very bad for very large data sets (e.g. starting a mirror maker against a large data set with large # of partitions from scratch). * Note that the algorithm never deals with the actual message data, only metadata about what messages are available. This is relevant since this is what suggested the approach in the patch could still be performant -- ReplicaManager.readMessageSets processes the entire FetchRequest and filters it down because the metadata involved is relatively small. * Based on the previous two points, this really needs some more realistic large scale system tests to make sure this approach is not only correct, but provides reasonable performance (or indicates we need to revise the algorithm for selecting a subset of the data). * Testing isn't really complete -- I triggered the issue with 4 topics * 600 MB/topic, which is > 2GB. Another obvious case to check is when some partitions contain > 2GB on their own. * I'd like someone to help sanity check the exact maximum FetchResponse serialized size we limit messages to. It's not Int.MaxValue because the FetchResponseSend class adds 4 + FetchResponse.sizeInBytes for it's own size. I'd like a sanity check that the extra 4 bytes is enough -- is there any additional wrapping we might need to account for? Getting a test to hit exactly that narrow range could be tricky. * The tests include both immediate-response and purgatory paths, but the purgatory version requires a timeout in the test, which could end up being flaky + wasting time, but it doesn't look like there's a great way to mock that right now. Maybe this doesn't matter if it moves to a system test? * One case this doesn't handle yet is when the data reaches > 2GB after it's in the purgatory. The result is correct, but the response is not sent as soon as that condition is satisfied. This is because it looks like evaluating this exactly would require calling readMessageSets and evaluating the size of the message for every DelayedFetch.isSatisifed call. This sounds like it could end up being pretty expensive. Maybe there's a better way, perhaps an approximate scheme? * The test requires some extra bytes in the fetchSize for each partition, presumably for overhead in encoding. I haven't tracked down exactly how big that should be, but I'm guessing it could end up affecting the results of more comprehensive tests. > java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33 > --------------------------------------------------------------------------- > > Key: KAFKA-1196 > URL: https://issues.apache.org/jira/browse/KAFKA-1196 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 0.8.0 > Environment: running java 1.7, linux and kafka compiled against scala > 2.9.2 > Reporter: Gerrit Jansen van Vuuren > Assignee: Ewen Cheslack-Postava > Priority: Blocker > Labels: newbie > Fix For: 0.9.0 > > Attachments: KAFKA-1196.patch > > > I have 6 topics each with 8 partitions spread over 4 kafka servers. > the servers are 24 core 72 gig ram. > While consuming from the topics I get an IlegalArgumentException and all > consumption stops, the error keeps on throwing. > I've tracked it down to FectchResponse.scala line 33 > The error happens when the FetchResponsePartitionData object's readFrom > method calls: > messageSetBuffer.limit(messageSetSize) > I put in some debug code the the messageSetSize is 671758648, while the > buffer.capacity() gives 155733313, for some reason the buffer is smaller than > the required message size. > I don't know the consumer code enough to debug this. It doesn't matter if > compression is used or not. -- This message was sent by Atlassian JIRA (v6.3.4#6332)