[ https://issues.apache.org/jira/browse/KAFKA-1895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15874894#comment-15874894 ]
Armin Braun commented on KAFKA-1895: ------------------------------------ [~jkreps] Maybe let me give the perspective of a user, using Kafka in a situation where high throughput and a low GC footprint are key and this change would be extremely valuable. What I would ideally like to see here and in KAFKA-2045 is this: Create a lower level interface along the lines of Hadoop's [RawKeyValueIterator](http://atetric.com/atetric/javadoc/org.apache.hadoop/hadoop-mapreduce-client-core/2.6.2/src-html/org/apache/hadoop/mapred/RawKeyValueIterator.html) that would back the current KafkaConsumer interface: {code}public interface RawRecordIterator { /** * Gets the current raw key. * * @return Gets the current raw key * @throws IOException */ ByteBuffer getKey() throws IOException; /** * Gets the current raw value. * * @return Gets the current raw value * @throws IOException */ ByteBuffer getValue() throws IOException; /** * Sets up the current key and value (for getKey and getValue). * * @return <code>true</code> if there exists a key/value, * <code>false</code> otherwise. * <code>false</code> implies the need for making the next call to * <code>poll()</code> * @throws IOException */ boolean next() throws IOException; /** * Polls Kafka for more Records. * * @throws IOException */ void poll() throws IOException; /** * Closes the iterator so that the underlying memory can be released. * * @throws IOException */ void close() throws IOException; } {code} I think this one is pretty idea if implemented properly: * If you start from some initial size for two backing `ByteBuffer`s and grow them as needed, if a poll doesn't fit into the existing ones you can have a nice and fast zero copy iterator (simply store the offsets and lengths for each record in `int` arrays, that you could also set up in some 0gc way easily and move position and limit on the buffers on every call to `next`). This interface can easily be used to generate the current `org.apache.kafka.clients.consumer.ConsumerRecords` in an efficient way (but would also if made public) allow users to implement more efficient serialization/deserialization approaches on top. Examples that would massively profit from this approach would be Avro or Hadoop's Writables, that could keep reusing the same key and value objects without the need to constantly run expensive `byte[]` allocations. Having this as the basis of the current implementation and publicly exposed would deal with this concern you voiced: {code} You could potentially implement both but you would need to change consumer.poll to allow passing back in the ConsumerRecords instance for reuse when you are done with it. {code} -> no need for this then in my opinion. Users can just come up with their own way to handle memory. Passing back records that aren't needed anymore seems like less than practical to implement. Much easier if this is just done by direct reuse of a single object like in Hadoop, Avro or Spark. Also {code} 2. We don't mangle the code to badly in doing so {code} wouldn't be much of a concern either imo since you can keep the old interface around on top. When implementing this I'd simply try to get this interface in at the lowest possible level and keep the existing codebase on top. This would probably still allow some optimization in the existing code on top + it would give uses the ability to write much faster consumers. Again kind of like Avor has it with giving you the option to reuse and object when deserializing, but still keeping the slower API that gives you a new one one every `next` call around on top of things (https://avro.apache.org/docs/current/gettingstartedjava.html#Deserializing is what I mean). PS: If this is seen as a viable approach I'd be happy to give it a go. This should be doable for me in a "not overly invasive to the existing codebase" way I think. > Investigate moving deserialization and decompression out of KafkaConsumer > ------------------------------------------------------------------------- > > Key: KAFKA-1895 > URL: https://issues.apache.org/jira/browse/KAFKA-1895 > Project: Kafka > Issue Type: Sub-task > Components: consumer > Reporter: Jay Kreps > > The consumer implementation in KAFKA-1760 decompresses fetch responses and > deserializes them into ConsumerRecords which are then handed back as the > result of poll(). > There are several downsides to this: > 1. It is impossible to scale serialization and decompression work beyond the > single thread running the KafkaConsumer. > 2. The results can come back during the processing of other calls such as > commit() etc which can result in caching these records a little longer. > An alternative would be to have ConsumerRecords wrap the actual compressed > serialized MemoryRecords chunks and do the deserialization during iteration. > This way you could scale this over a thread pool if needed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)