[ 
https://issues.apache.org/jira/browse/CAMEL-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16989833#comment-16989833
 ] 

Andrea Cosentino commented on CAMEL-14233:
------------------------------------------

It depends if we consider this a bug. To me this is an improvement so it should 
go in 2.25.0

> camel-kafka - topic overriding not possible when using aggregation
> ------------------------------------------------------------------
>
>                 Key: CAMEL-14233
>                 URL: https://issues.apache.org/jira/browse/CAMEL-14233
>             Project: Camel
>          Issue Type: Improvement
>          Components: camel-kafka
>    Affects Versions: 2.24.2
>            Reporter: Rafał Gała
>            Assignee: Omar Al-Safi
>            Priority: Minor
>             Fix For: 3.1.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> When exchange aggregation is used, using GroupedExchangeAggregationStrategy 
> for example:
>  
> {code:java}
> from(..)
> .process(some processor here that sets KafkaConstants.TOPIC header here)
> .aggregate(new GroupedExchangeAggregationStrategy ())
> .to(kafka:...)
> {code}
> it is not possible to override topic per exchange by using 
> KafkaConstants.TOPIC header, because in *createRecord* of KafkaProducer 
> class, the topic is chosen from header of aggregating Exchange which may not 
> be set because it may have been set only on Exchanges that were aggregated. 
> When creating ProducerRecord from Iterable, the topic should be chosen from 
> header of each Exchange separately:
> {code:java}
> protected Iterator<ProducerRecord> createRecorder(Exchange exchange) throws 
> Exception {
>     String topic = endpoint.getConfiguration().getTopic();
>     if (!endpoint.getConfiguration().isBridgeEndpoint()) {
>         String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, 
> String.class);
>         boolean allowHeader = true;
>         // when we do not bridge then detect if we try to send back to 
> ourselves
>         // which we most likely do not want to do
>         if (headerTopic != null && 
> endpoint.getConfiguration().isCircularTopicDetection()) {
>             Endpoint from = exchange.getFromEndpoint();
>             if (from instanceof KafkaEndpoint) {
>                 String fromTopic = ((KafkaEndpoint) 
> from).getConfiguration().getTopic();
>                 allowHeader = !headerTopic.equals(fromTopic);
>                 if (!allowHeader) {
>                     log.debug("Circular topic detected from message header."
>                             + " Cannot send to same topic as the message 
> comes from: {}"
>                             + ". Will use endpoint configured topic: {}", 
> from, topic);
>                 }
>             }
>         }
>         if (allowHeader && headerTopic != null) {
>             topic = headerTopic;
>         }
>     }
>     if (topic == null) {
>         // if topic property was not received from configuration or header 
> parameters take it from the remaining URI
>         topic = URISupport.extractRemainderPath(new 
> URI(endpoint.getEndpointUri()), true);
>     }
> ...
>     Object msg = exchange.getIn().getBody();
>     // is the message body a list or something that contains multiple values
>     Iterator<Object> iterator = null;
>     if (msg instanceof Iterable) {
>         iterator = ((Iterable<Object>) msg).iterator();
>     } else if (msg instanceof Iterator) {
>         iterator = (Iterator<Object>) msg;
>     }
>     if (iterator != null) {
>         final Iterator<Object> msgList = iterator;
> {code}
> The msgTopic variable below should be set from KafkaConstants.TOPIC header of 
> next exchange from collection
> {code}
>         final String msgTopic = topic;
>         return new Iterator<ProducerRecord>() {
>             @Override
>             public boolean hasNext() {
>                 return msgList.hasNext();
>             }
>             @Override
>             public ProducerRecord next() {
>                 // must convert each entry of the iterator into the value 
> according to the serializer
>                 Object next = msgList.next();
>                 Object value = tryConvertToSerializedType(exchange, next, 
> endpoint.getConfiguration().getSerializerClass());
>                 if (hasPartitionKey && hasMessageKey) {
>                     return new ProducerRecord(msgTopic, partitionKey, null, 
> key, value, propagatedHeaders);
>                 } else if (hasMessageKey) {
>                     return new ProducerRecord(msgTopic, null, null, key, 
> value, propagatedHeaders);
>                 } else {
>                     return new ProducerRecord(msgTopic, null, null, null, 
> value, propagatedHeaders);
>                 }
>             }
>             @Override
>             public void remove() {
>                 msgList.remove();
>             }
>         };
>     }
> ...
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to