[ 
https://issues.apache.org/jira/browse/KAFKA-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838538#comment-15838538
 ] 

Francesco Lemma commented on KAFKA-4691:
----------------------------------------

Thanks for commenting Matthias. The fact that there's one single producer which 
uses byte[] for key and value isn't necessary a limitation. The problem is that 
the interceptor is called after the serialization happens. If the interceptor 
is called before (as it should according to the JavaDoc), then it can 
manipulate the message regardless of the type and, still regardless of the 
type, it will be then serialized to a byte[] by the RecordCollector and will be 
compatible with the Kafka Streams producer. Am I missing something here? I 
think the problem is where the interceptors are triggered.

> ProducerInterceptor.onSend() is called after key and value are serialized
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-4691
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4691
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, streams
>    Affects Versions: 0.10.1.1
>            Reporter: Francesco Lemma
>              Labels: easyfix
>         Attachments: 2017-01-24 00_50_55-SDG_CR33_DevStudio - Java EE - 
> org.apache.kafka.streams.processor.internals.Reco.png
>
>
> According to the JavaDoc 
> (https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html)
>  " This is called from KafkaProducer.send(ProducerRecord) and 
> KafkaProducer.send(ProducerRecord, Callback) methods, before key and value 
> get serialized and partition is assigned (if partition is not specified in 
> ProducerRecord)".
> Although when using this with Kafka Streams 
> (StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)) the 
> key and value contained in the record object are already serialized.
> As you can see from the screenshot, the serialization is performed inside 
> RecordCollectionImpl.send(ProducerRecord<K, V> record, Serializer<K> 
> keySerializer, Serializer<V> valueSerializer,
>                             StreamPartitioner<K, V> partitioner), effectively 
> before calling the send method of the producer which will trigger the 
> interceptor.
> This makes it unable to perform any kind of operation involving the key or 
> value of the message, unless at least performing an additional 
> deserialization.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to