[ 
https://issues.apache.org/jira/browse/KAFKA-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838558#comment-15838558
 ] 

Matthias J. Sax edited comment on KAFKA-4691 at 1/25/17 8:55 PM:
-----------------------------------------------------------------

Sorry for not being precise enough. For {{KafkaProducer}} interceptors are 
called before the producer does any serialization. However, in Streams, the 
producer never sees unserialized data, because before the producer's {{send()}} 
method is called, Streams serialized the data already and gives bytes array to 
the producer. Note, that {{RecordCollectorImpl}} is part of Streams and not of 
{{KafkaProducer}} (it just uses {{KafkaProducer}}). Btw: For this case, the 
producer skips any serialization step because the data is already of type 
{{byte[]}}.

In Streams, we cannot give unserialized data to the {{KafkaProducer}} because 
the producer can only have a single serializer. Thus, as we need to handle more 
than one different key-types, {{KakfaProducer}} handle them as it only can 
serialize a single type. The solution is to do the serialization before giving 
the data to the producer.


was (Author: mjsax):
Sorry for not being precise enough. For {{KafkaProducer}} interceptors are 
called before the producer does any serialization. However, in Streams, the 
producer never sees unserialized data, because before the producer's {{send()}} 
method is called, Streams serialized the data already and gives bytes array to 
the producer. Note, that {{RecordCollectorImpl}} is part of Streams and not of 
{{KafkaProducer}} (it just uses {{KafkaProducer}}). Btw: For this case, the 
producer skips any serialization step because the data is already of type 
{{byte[]}}.

In Streams, we cannot give unserialized data to the {{KafkaProducer}} because 
the producer can only have a single serializer. Thus, as we need to handle more 
than one different key-types, {{KakfaProducer}} handle them as it only can 
serialize a single type. The solution is to do the serialization before giving 
the data to the producer.



> ProducerInterceptor.onSend() is called after key and value are serialized
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-4691
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4691
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, streams
>    Affects Versions: 0.10.1.1
>            Reporter: Francesco Lemma
>              Labels: easyfix
>         Attachments: 2017-01-24 00_50_55-SDG_CR33_DevStudio - Java EE - 
> org.apache.kafka.streams.processor.internals.Reco.png
>
>
> According to the JavaDoc 
> (https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html)
>  " This is called from KafkaProducer.send(ProducerRecord) and 
> KafkaProducer.send(ProducerRecord, Callback) methods, before key and value 
> get serialized and partition is assigned (if partition is not specified in 
> ProducerRecord)".
> Although when using this with Kafka Streams 
> (StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)) the 
> key and value contained in the record object are already serialized.
> As you can see from the screenshot, the serialization is performed inside 
> RecordCollectionImpl.send(ProducerRecord<K, V> record, Serializer<K> 
> keySerializer, Serializer<V> valueSerializer,
>                             StreamPartitioner<K, V> partitioner), effectively 
> before calling the send method of the producer which will trigger the 
> interceptor.
> This makes it unable to perform any kind of operation involving the key or 
> value of the message, unless at least performing an additional 
> deserialization.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to