[ https://issues.apache.org/jira/browse/KAFKA-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838619#comment-15838619 ]
Francesco Lemma commented on KAFKA-4691: ---------------------------------------- Thanks Matthias. I perfectly understand what you are saying and I agree 100%. Maybe I'm not expressing myself clearly here. What I'm proposing here is to extract the call to the interceptors from the {{KafkaProducer}} and put it in the {{RecordCollectorImpl}}. To avoid affecting non Streams implementation there could be an overloaded method in the {{KafkaProducer}} similar to this: {code:java} //Overloaded method. This will be called by the RecordCollectorImpl with triggerInterceptors = false public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback, boolean triggerInterceptors) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors == null || !triggerInterceptors ? record : this.interceptors.onSend(record); return doSend(interceptedRecord, callback); } //This method has the original signature. All calls to this KafkaProducer send method will ideally keep calling this method public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { send(record, callback, true); } {code} Then the {{RecordCollectorImpl.send(...)}} method could potentially be modified as follows: {code:java} @Override public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer, StreamPartitioner<K, V> partitioner) { //The visibility of interceptors would need to be changed or other way to expose them should be implemented. //This line has been added from the KafkaProducer ProducerRecord<K, V> interceptedRecord = this.producer.interceptors == null || !triggerInterceptors ? record : this.producer.interceptors.onSend(record); byte[] keyBytes = keySerializer.serialize(interceptedRecord.topic(), interceptedRecord.key()); byte[] valBytes = valueSerializer.serialize(interceptedRecord.topic(), interceptedRecord.value()); Integer partition = interceptedRecord.partition(); if (partition == null && partitioner != null) { List<PartitionInfo> partitions = this.producer.partitionsFor(interceptedRecord.topic()); if (partitions != null && partitions.size() > 0) partition = partitioner.partition(interceptedRecord.key(), interceptedRecord.value(), partitions.size()); } ProducerRecord<byte[], byte[]> serializedRecord = new ProducerRecord<>(interceptedRecord.topic(), partition, interceptedRecord.timestamp(), keyBytes, valBytes); final String topic = serializedRecord.topic(); ....... ....... ....... } {code} I would be more than willing to make the changes and send a pull request if at all it makes sense to you guys. I believe that the ProducerInterceptor is a very valuable functionality and this issue makes it realistically not very useful within KafkaStreams. > ProducerInterceptor.onSend() is called after key and value are serialized > ------------------------------------------------------------------------- > > Key: KAFKA-4691 > URL: https://issues.apache.org/jira/browse/KAFKA-4691 > Project: Kafka > Issue Type: Bug > Components: clients, streams > Affects Versions: 0.10.1.1 > Reporter: Francesco Lemma > Labels: easyfix > Attachments: 2017-01-24 00_50_55-SDG_CR33_DevStudio - Java EE - > org.apache.kafka.streams.processor.internals.Reco.png > > > According to the JavaDoc > (https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html) > " This is called from KafkaProducer.send(ProducerRecord) and > KafkaProducer.send(ProducerRecord, Callback) methods, before key and value > get serialized and partition is assigned (if partition is not specified in > ProducerRecord)". > Although when using this with Kafka Streams > (StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)) the > key and value contained in the record object are already serialized. > As you can see from the screenshot, the serialization is performed inside > RecordCollectionImpl.send(ProducerRecord<K, V> record, Serializer<K> > keySerializer, Serializer<V> valueSerializer, > StreamPartitioner<K, V> partitioner), effectively > before calling the send method of the producer which will trigger the > interceptor. > This makes it unable to perform any kind of operation involving the key or > value of the message, unless at least performing an additional > deserialization. -- This message was sent by Atlassian JIRA (v6.3.4#6332)