[
https://issues.apache.org/jira/browse/CAMEL-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rafał Gała reopened CAMEL-14233:
--------------------------------
> camel-kafka - topic overriding not possible when using aggregation
> ------------------------------------------------------------------
>
> Key: CAMEL-14233
> URL: https://issues.apache.org/jira/browse/CAMEL-14233
> Project: Camel
> Issue Type: Improvement
> Components: camel-kafka
> Affects Versions: 2.24.2
> Reporter: Rafał Gała
> Assignee: Omar Al-Safi
> Priority: Minor
> Fix For: 3.1.0
>
> Attachments: kafka2.patch, metadata.patch
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> When exchange aggregation is used, using GroupedExchangeAggregationStrategy
> for example:
>
> {code:java}
> from(..)
> .process(some processor here that sets KafkaConstants.TOPIC header here)
> .aggregate(new GroupedExchangeAggregationStrategy ())
> .to(kafka:...)
> {code}
> it is not possible to override topic per exchange by using
> KafkaConstants.TOPIC header, because in *createRecord* of KafkaProducer
> class, the topic is chosen from header of aggregating Exchange which may not
> be set because it may have been set only on Exchanges that were aggregated.
> When creating ProducerRecord from Iterable, the topic should be chosen from
> header of each Exchange separately:
> {code:java}
> protected Iterator<ProducerRecord> createRecorder(Exchange exchange) throws
> Exception {
> String topic = endpoint.getConfiguration().getTopic();
> if (!endpoint.getConfiguration().isBridgeEndpoint()) {
> String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC,
> String.class);
> boolean allowHeader = true;
> // when we do not bridge then detect if we try to send back to
> ourselves
> // which we most likely do not want to do
> if (headerTopic != null &&
> endpoint.getConfiguration().isCircularTopicDetection()) {
> Endpoint from = exchange.getFromEndpoint();
> if (from instanceof KafkaEndpoint) {
> String fromTopic = ((KafkaEndpoint)
> from).getConfiguration().getTopic();
> allowHeader = !headerTopic.equals(fromTopic);
> if (!allowHeader) {
> log.debug("Circular topic detected from message header."
> + " Cannot send to same topic as the message
> comes from: {}"
> + ". Will use endpoint configured topic: {}",
> from, topic);
> }
> }
> }
> if (allowHeader && headerTopic != null) {
> topic = headerTopic;
> }
> }
> if (topic == null) {
> // if topic property was not received from configuration or header
> parameters take it from the remaining URI
> topic = URISupport.extractRemainderPath(new
> URI(endpoint.getEndpointUri()), true);
> }
> ...
> Object msg = exchange.getIn().getBody();
> // is the message body a list or something that contains multiple values
> Iterator<Object> iterator = null;
> if (msg instanceof Iterable) {
> iterator = ((Iterable<Object>) msg).iterator();
> } else if (msg instanceof Iterator) {
> iterator = (Iterator<Object>) msg;
> }
> if (iterator != null) {
> final Iterator<Object> msgList = iterator;
> {code}
> The msgTopic variable below should be set from KafkaConstants.TOPIC header of
> next exchange from collection
> {code}
> final String msgTopic = topic;
> return new Iterator<ProducerRecord>() {
> @Override
> public boolean hasNext() {
> return msgList.hasNext();
> }
> @Override
> public ProducerRecord next() {
> // must convert each entry of the iterator into the value
> according to the serializer
> Object next = msgList.next();
> Object value = tryConvertToSerializedType(exchange, next,
> endpoint.getConfiguration().getSerializerClass());
> if (hasPartitionKey && hasMessageKey) {
> return new ProducerRecord(msgTopic, partitionKey, null,
> key, value, propagatedHeaders);
> } else if (hasMessageKey) {
> return new ProducerRecord(msgTopic, null, null, key,
> value, propagatedHeaders);
> } else {
> return new ProducerRecord(msgTopic, null, null, null,
> value, propagatedHeaders);
> }
> }
> @Override
> public void remove() {
> msgList.remove();
> }
> };
> }
> ...
> }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)