[ https://issues.apache.org/jira/browse/KAFKA-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838558#comment-15838558 ]
Matthias J. Sax edited comment on KAFKA-4691 at 1/25/17 8:55 PM: ----------------------------------------------------------------- Sorry for not being precise enough. For {{KafkaProducer}} interceptors are called before the producer does any serialization. However, in Streams, the producer never sees unserialized data, because before the producer's {{send()}} method is called, Streams serialized the data already and gives bytes array to the producer. Note, that {{RecordCollectorImpl}} is part of Streams and not of {{KafkaProducer}} (it just uses {{KafkaProducer}}). Btw: For this case, the producer skips any serialization step because the data is already of type {{byte[]}}. In Streams, we cannot give unserialized data to the {{KafkaProducer}} because the producer can only have a single serializer. Thus, as we need to handle more than one different key-types, {{KakfaProducer}} handle them as it only can serialize a single type. The solution is to do the serialization before giving the data to the producer. was (Author: mjsax): Sorry for not being precise enough. For {{KafkaProducer}} interceptors are called before the producer does any serialization. However, in Streams, the producer never sees unserialized data, because before the producer's {{send()}} method is called, Streams serialized the data already and gives bytes array to the producer. Note, that {{RecordCollectorImpl}} is part of Streams and not of {{KafkaProducer}} (it just uses {{KafkaProducer}}). Btw: For this case, the producer skips any serialization step because the data is already of type {{byte[]}}. In Streams, we cannot give unserialized data to the {{KafkaProducer}} because the producer can only have a single serializer. Thus, as we need to handle more than one different key-types, {{KakfaProducer}} handle them as it only can serialize a single type. The solution is to do the serialization before giving the data to the producer. > ProducerInterceptor.onSend() is called after key and value are serialized > ------------------------------------------------------------------------- > > Key: KAFKA-4691 > URL: https://issues.apache.org/jira/browse/KAFKA-4691 > Project: Kafka > Issue Type: Bug > Components: clients, streams > Affects Versions: 0.10.1.1 > Reporter: Francesco Lemma > Labels: easyfix > Attachments: 2017-01-24 00_50_55-SDG_CR33_DevStudio - Java EE - > org.apache.kafka.streams.processor.internals.Reco.png > > > According to the JavaDoc > (https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html) > " This is called from KafkaProducer.send(ProducerRecord) and > KafkaProducer.send(ProducerRecord, Callback) methods, before key and value > get serialized and partition is assigned (if partition is not specified in > ProducerRecord)". > Although when using this with Kafka Streams > (StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)) the > key and value contained in the record object are already serialized. > As you can see from the screenshot, the serialization is performed inside > RecordCollectionImpl.send(ProducerRecord<K, V> record, Serializer<K> > keySerializer, Serializer<V> valueSerializer, > StreamPartitioner<K, V> partitioner), effectively > before calling the send method of the producer which will trigger the > interceptor. > This makes it unable to perform any kind of operation involving the key or > value of the message, unless at least performing an additional > deserialization. -- This message was sent by Atlassian JIRA (v6.3.4#6332)