[ 
https://issues.apache.org/jira/browse/KAFKA-1895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15879170#comment-15879170
 ] 

Armin Braun edited comment on KAFKA-1895 at 2/22/17 10:30 PM:
--------------------------------------------------------------

[~hachikuji]

{quote}

How would you propose to expose RawRecordIterator in the consumer API? The 
problem is that IO is currently driven through the poll() API, so introducing 
something else would be a bit weird (though maybe there's a nice way to do it).

{quote}

I admit this is potentially not ideal in terms of nice API design but I can see 
a relatively short route to implementation when adding a method like below to 
`org.apache.kafka.clients.consumer.Consumer`:

{code}
    public RawRecordIterator pollRaw(long timeout);
{code}
 
which would simply return the RawRecordIterator that you could also use to back 
the current:

{code}
    /**
     * @see KafkaConsumer#poll(long)
     */
    public ConsumerRecords<K, V> poll(long timeout);
{code}

... then you could deserialize the records from the `RawRecordIterator`  when 
instantiating `ConsumerRecords` from the `RawRecordIterator`.
I would also just deserialize when instantiating `ConsumerRecords` from 
`RawRecordIterator` and have that drain the full `RawRecordIterator` to support:

{code}
    /**
     * Get just the records for the given partition
     * 
     * @param partition The partition to get records for
     */
    public List<ConsumerRecord<K, V>> records(TopicPartition partition) {
        List<ConsumerRecord<K, V>> recs = this.records.get(partition);
        if (recs == null)
            return Collections.emptyList();
        else
            return Collections.unmodifiableList(recs);
    }
{code}

still. So no API change needed here + I think moving to a reused set of 
`ByteBuffer` abstracted by `RawRecordIterator` instead of the current solution 
constantly allocating `ByteBuffer` wrapped in things like 
`org.apache.kafka.common.network.NetworkReceive` would be significantly faster 
by saving lots of system calls (wouldn't have to read the header(size) bytes 
separately anymore if pointers on the underlying buffers are used instead of 
setting up buffers and hence byte[] after making that extra system call just to 
read 4 bytes).

So in a nutshell, just allow `pollRaw` which would always return a 
`RawRecordIterator`. Retain all existing APIs and instantiate `ConsumerRecords` 
from the iterator in a (copying) way that keeps `ConsumerRecord` immutable.

Hope this was understandable :) I understand there are a few tricky spots here 
in the implementation (though given that a wealth of work on this exists 
especially in Spark it's not that hard in 2017 imo), but just from the API I 
think the approach is sound. Plus it doesn't require any trickery where 
`ConsumerRecord` instances either become mutable or must be "passed back 
somewhere" to make them reusable, while still offering a 0 GC raw API.

EDIT:

So the semantic I would be looking for would be this:

{code}
while(/*  whatever your use case desires */) { 
Rawrecorditerator rawIterator = consumer.pollRaw(); //same object/iterator 
should be returned here every time ... filled with new content.
   while(rawIterator.next()) {
      // do something with key and value buffer contents
   }
}
{code}


was (Author: original-brownbear):
[~hachikuji]

{quote}

How would you propose to expose RawRecordIterator in the consumer API? The 
problem is that IO is currently driven through the poll() API, so introducing 
something else would be a bit weird (though maybe there's a nice way to do it).

{quote}

I admit this is potentially not ideal in terms of nice API design but I can see 
a relatively short route to implementation when adding a method like below to 
`org.apache.kafka.clients.consumer.Consumer`:

{code}
    public RawRecordIterator pollRaw(long timeout);
{code}
 
which would simply return the RawRecordIterator that you could also use to back 
the current:

{code}
    /**
     * @see KafkaConsumer#poll(long)
     */
    public ConsumerRecords<K, V> poll(long timeout);
{code}

... then you could deserialize the records from the `RawRecordIterator`  when 
instantiating `ConsumerRecords` from the `RawRecordIterator`.
I would also just deserialize when instantiating `ConsumerRecords` from 
`RawRecordIterator` and have that drain the full `RawRecordIterator` to support:

{code}
    /**
     * Get just the records for the given partition
     * 
     * @param partition The partition to get records for
     */
    public List<ConsumerRecord<K, V>> records(TopicPartition partition) {
        List<ConsumerRecord<K, V>> recs = this.records.get(partition);
        if (recs == null)
            return Collections.emptyList();
        else
            return Collections.unmodifiableList(recs);
    }
{code}

still. So no API change needed here + I think moving to a reused set of 
`ByteBuffer` abstracted by `RawRecordIterator` instead of the current solution 
constantly allocating `ByteBuffer` wrapped in things like 
`org.apache.kafka.common.network.NetworkReceive` would be significantly faster 
by saving lots of system calls (wouldn't have to read the header(size) bytes 
separately anymore if pointers on the underlying buffers are used instead of 
setting up buffers and hence byte[] after making that extra system call just to 
read 4 bytes).

So in a nutshell, just allow `pollRaw` which would always return a 
`RawRecordIterator`. Retain all existing APIs and instantiate `ConsumerRecords` 
from the iterator in a (copying) way that keeps `ConsumerRecord` immutable.

Hope this was understandable :) I understand there are a few tricky spots here 
in the implementation (though given that a wealth of work on this exists 
especially in Spark it's not that hard in 2017 imo), but just from the API I 
think the approach is sound. Plus it doesn't require any trickery where 
`ConsumerRecord` instances either become mutable or must be "passed back 
somewhere" to make them reusable, while still offering a 0 GC raw API.

EDIT:

So the semantic I would be looking for would be this:

{code}
while(/*  whatever your use case desires */) { 
Rawrecorditerator rawIterator = consumer.pollRaw(); //same object/iterator 
should be returned here every time ... filled with new content.
   while(rawIterator.next()) {
      // do something with key and value buffer contents
   }
}

> Investigate moving deserialization and decompression out of KafkaConsumer
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-1895
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1895
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: consumer
>            Reporter: Jay Kreps
>
> The consumer implementation in KAFKA-1760 decompresses fetch responses and 
> deserializes them into ConsumerRecords which are then handed back as the 
> result of poll().
> There are several downsides to this:
> 1. It is impossible to scale serialization and decompression work beyond the 
> single thread running the KafkaConsumer.
> 2. The results can come back during the processing of other calls such as 
> commit() etc which can result in caching these records a little longer.
> An alternative would be to have ConsumerRecords wrap the actual compressed 
> serialized MemoryRecords chunks and do the deserialization during iteration. 
> This way you could scale this over a thread pool if needed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to