[ https://issues.apache.org/jira/browse/KAFKA-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838538#comment-15838538 ]
Francesco Lemma commented on KAFKA-4691: ---------------------------------------- Thanks for commenting Matthias. The fact that there's one single producer which uses byte[] for key and value isn't necessary a limitation. The problem is that the interceptor is called after the serialization happens. If the interceptor is called before (as it should according to the JavaDoc), then it can manipulate the message regardless of the type and, still regardless of the type, it will be then serialized to a byte[] by the RecordCollector and will be compatible with the Kafka Streams producer. Am I missing something here? I think the problem is where the interceptors are triggered. > ProducerInterceptor.onSend() is called after key and value are serialized > ------------------------------------------------------------------------- > > Key: KAFKA-4691 > URL: https://issues.apache.org/jira/browse/KAFKA-4691 > Project: Kafka > Issue Type: Bug > Components: clients, streams > Affects Versions: 0.10.1.1 > Reporter: Francesco Lemma > Labels: easyfix > Attachments: 2017-01-24 00_50_55-SDG_CR33_DevStudio - Java EE - > org.apache.kafka.streams.processor.internals.Reco.png > > > According to the JavaDoc > (https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html) > " This is called from KafkaProducer.send(ProducerRecord) and > KafkaProducer.send(ProducerRecord, Callback) methods, before key and value > get serialized and partition is assigned (if partition is not specified in > ProducerRecord)". > Although when using this with Kafka Streams > (StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)) the > key and value contained in the record object are already serialized. > As you can see from the screenshot, the serialization is performed inside > RecordCollectionImpl.send(ProducerRecord<K, V> record, Serializer<K> > keySerializer, Serializer<V> valueSerializer, > StreamPartitioner<K, V> partitioner), effectively > before calling the send method of the producer which will trigger the > interceptor. > This makes it unable to perform any kind of operation involving the key or > value of the message, unless at least performing an additional > deserialization. -- This message was sent by Atlassian JIRA (v6.3.4#6332)