[ 
https://issues.apache.org/jira/browse/KAFKA-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838619#comment-15838619
 ] 

Francesco Lemma commented on KAFKA-4691:
----------------------------------------

Thanks Matthias. I perfectly understand what you are saying and I agree 100%. 
Maybe I'm not expressing myself clearly here. What I'm proposing here is to 
extract the call to the interceptors from the {{KafkaProducer}} and put it in 
the {{RecordCollectorImpl}}. To avoid affecting non Streams implementation 
there could be an overloaded method in the {{KafkaProducer}} similar to this:

{code:java}
//Overloaded method. This will be called by the RecordCollectorImpl with 
triggerInterceptors = false
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback 
callback, boolean triggerInterceptors) {
    // intercept the record, which can be potentially modified; this method 
does not throw exceptions
    ProducerRecord<K, V> interceptedRecord = this.interceptors == null || 
!triggerInterceptors ? record : this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}

//This method has the original signature. All calls to this KafkaProducer send 
method will ideally keep calling this method
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback 
callback) {
    send(record, callback, true);
}
{code}

Then the {{RecordCollectorImpl.send(...)}} method could potentially be modified 
as follows:

{code:java}
    @Override
    public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> 
keySerializer, Serializer<V> valueSerializer,
                            StreamPartitioner<K, V> partitioner) {

        //The visibility of interceptors would need to be changed or other way 
to expose them should be implemented.
        //This line has been added from the KafkaProducer
        ProducerRecord<K, V> interceptedRecord = this.producer.interceptors == 
null || !triggerInterceptors ? record : 
this.producer.interceptors.onSend(record);

        byte[] keyBytes = keySerializer.serialize(interceptedRecord.topic(), 
interceptedRecord.key());
        byte[] valBytes = valueSerializer.serialize(interceptedRecord.topic(), 
interceptedRecord.value());
        Integer partition = interceptedRecord.partition();
        if (partition == null && partitioner != null) {
            List<PartitionInfo> partitions = 
this.producer.partitionsFor(interceptedRecord.topic());
            if (partitions != null && partitions.size() > 0)
                partition = partitioner.partition(interceptedRecord.key(), 
interceptedRecord.value(), partitions.size());
        }

        ProducerRecord<byte[], byte[]> serializedRecord =
                new ProducerRecord<>(interceptedRecord.topic(), partition, 
interceptedRecord.timestamp(), keyBytes, valBytes);
        final String topic = serializedRecord.topic();

        .......
        .......
        .......

    }
{code}

I would be more than willing to make the changes and send a pull request if at 
all it makes sense to you guys. I believe that the ProducerInterceptor is a 
very valuable functionality and this issue makes it realistically not very 
useful within KafkaStreams.

> ProducerInterceptor.onSend() is called after key and value are serialized
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-4691
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4691
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, streams
>    Affects Versions: 0.10.1.1
>            Reporter: Francesco Lemma
>              Labels: easyfix
>         Attachments: 2017-01-24 00_50_55-SDG_CR33_DevStudio - Java EE - 
> org.apache.kafka.streams.processor.internals.Reco.png
>
>
> According to the JavaDoc 
> (https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html)
>  " This is called from KafkaProducer.send(ProducerRecord) and 
> KafkaProducer.send(ProducerRecord, Callback) methods, before key and value 
> get serialized and partition is assigned (if partition is not specified in 
> ProducerRecord)".
> Although when using this with Kafka Streams 
> (StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)) the 
> key and value contained in the record object are already serialized.
> As you can see from the screenshot, the serialization is performed inside 
> RecordCollectionImpl.send(ProducerRecord<K, V> record, Serializer<K> 
> keySerializer, Serializer<V> valueSerializer,
>                             StreamPartitioner<K, V> partitioner), effectively 
> before calling the send method of the producer which will trigger the 
> interceptor.
> This makes it unable to perform any kind of operation involving the key or 
> value of the message, unless at least performing an additional 
> deserialization.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to