[ 
https://issues.apache.org/jira/browse/KAFKA-1895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15874894#comment-15874894
 ] 

Armin Braun commented on KAFKA-1895:
------------------------------------

[~jkreps] Maybe let me give the perspective of a user, using Kafka in a 
situation where high throughput and a low GC footprint are key and this change 
would be extremely valuable.

What I would ideally like to see here and in KAFKA-2045 is this:

Create a lower level interface along the lines of Hadoop's 
[RawKeyValueIterator](http://atetric.com/atetric/javadoc/org.apache.hadoop/hadoop-mapreduce-client-core/2.6.2/src-html/org/apache/hadoop/mapred/RawKeyValueIterator.html)
 that would back the current KafkaConsumer interface:

{code}public interface RawRecordIterator {
    /**
     * Gets the current raw key.
     *
     * @return Gets the current raw key
     * @throws IOException
     */
    ByteBuffer getKey() throws IOException;

    /**
     * Gets the current raw value.
     *
     * @return Gets the current raw value
     * @throws IOException
     */
    ByteBuffer getValue() throws IOException;

    /**
     * Sets up the current key and value (for getKey and getValue).
     *
     * @return <code>true</code> if there exists a key/value,
     *         <code>false</code> otherwise.
     *         <code>false</code> implies the need for making the next call to
     *         <code>poll()</code>
     * @throws IOException
     */
    boolean next() throws IOException;

    /**
     * Polls Kafka for more Records.
     *
     * @throws IOException
     */
    void poll() throws IOException;

    /**
     * Closes the iterator so that the underlying memory can be released.
     *
     * @throws IOException
     */
    void close() throws IOException;
}
{code}

I think this one is pretty idea if implemented properly:

* If you start from some initial size for two backing `ByteBuffer`s and grow 
them as needed, if a poll doesn't fit into the existing ones you can have a 
nice and fast zero copy iterator (simply store the offsets and lengths for each 
record in `int` arrays, that you could also set up in some 0gc way easily and 
move position and limit on the buffers on every call to `next`).

This interface can easily be used to generate the current 
`org.apache.kafka.clients.consumer.ConsumerRecords` in an efficient way (but 
would also if made public) allow users to implement more efficient 
serialization/deserialization approaches on top. Examples that would massively 
profit from this approach would be Avro or Hadoop's Writables, that could keep 
reusing the same key and value objects without the need to constantly run 
expensive `byte[]` allocations.

Having this as the basis of the current implementation and publicly exposed 
would deal with this concern you voiced:

{code}
You could potentially implement both but you would need to change consumer.poll 
to allow passing back in the ConsumerRecords instance for reuse when you are 
done with it.
{code}

-> no need for this then in my opinion. Users can just come up with their own 
way to handle memory. Passing back records that aren't needed anymore seems 
like less than practical to implement. Much easier if this is just done by 
direct reuse of a single object like in Hadoop, Avro or Spark.

Also 

{code}
2. We don't mangle the code to badly in doing so
{code}

wouldn't be much of a concern either imo since you can keep the old interface 
around on top. When implementing this I'd simply try to get this interface in 
at the lowest possible level and keep the existing codebase on top. This would 
probably still allow some optimization in the existing code on top + it would 
give uses the ability to write much faster consumers.
Again kind of like Avor has it with giving you the option to reuse and object 
when deserializing, but still keeping the slower API that gives you a new one 
one every `next` call around on top of things 
(https://avro.apache.org/docs/current/gettingstartedjava.html#Deserializing is 
what I mean).

PS: If this is seen as a viable approach I'd be happy to give it a go. This 
should be doable for me in a "not overly invasive to the existing codebase" way 
I think.


> Investigate moving deserialization and decompression out of KafkaConsumer
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-1895
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1895
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: consumer
>            Reporter: Jay Kreps
>
> The consumer implementation in KAFKA-1760 decompresses fetch responses and 
> deserializes them into ConsumerRecords which are then handed back as the 
> result of poll().
> There are several downsides to this:
> 1. It is impossible to scale serialization and decompression work beyond the 
> single thread running the KafkaConsumer.
> 2. The results can come back during the processing of other calls such as 
> commit() etc which can result in caching these records a little longer.
> An alternative would be to have ConsumerRecords wrap the actual compressed 
> serialized MemoryRecords chunks and do the deserialization during iteration. 
> This way you could scale this over a thread pool if needed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to