[ 
https://issues.apache.org/jira/browse/CAMEL-11215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Claus Ibsen updated CAMEL-11215:
--------------------------------
    Fix Version/s:     (was: 2.18.5)

> Camel Kafka component commits offsets in case of exceptions
> -----------------------------------------------------------
>
>                 Key: CAMEL-11215
>                 URL: https://issues.apache.org/jira/browse/CAMEL-11215
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.18.3
>            Reporter: Roger
>            Assignee: Claus Ibsen
>             Fix For: 2.19.1, 2.20.0
>
>
> My processor in the router throws an exception but the Kafka component still 
> commits the offsets. 
> My route: (heavily redacted and modified)
> {code:title=Route|borderStyle=solid}
> from( "kafka://blah-blah" ).routeId("MyRoute")
>                 .convertBodyTo( MyData.class )
>                 .process( "MyProcessor" )
>                 .to( "DestinationProcessor" );
> {code}
> The exception I get: 
> {code:title=Exception|borderStyle=solid}
>         at com.mycompany.MyProcessor.process(MyProcessor.java:152)
>         at 
> org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:103)
>         at 
> org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71)
>         at 
> org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
>         at 
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145)
>         at 
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
>         at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
>         at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
>         at 
> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:117)
>         at 
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
>         at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
>         at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
>         at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
>         at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
>         at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
>         at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
>         at 
> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:140)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.sql.SQLException: Exception occurred while getting 
> connection: oracle.ucp.UniversalConnectionPoolException: Cannot get 
> Connection from Datasource: java.sql.SQLException: Listener refused the 
> connection with the following error:
> ORA-12514, TNS:listener does not currently know of service requested in 
> connect descriptor
> {code}
> Here is the corresponding Kafka component code:(KafkaConsumer.java) -This 
> part of the code does not seem to handle the exception. The exception handler 
> simply eats up the exception and the fall through code happily commits the 
> offsets. Is this a bug? or am I missing something?
> {code:title=KafkaConsumer.java|borderStyle=solid}
> while (isRunAllowed() && !isStoppingOrStopped() && 
> !isSuspendingOrSuspended()) {
>                     ConsumerRecords<Object, Object> allRecords = 
> consumer.poll(pollTimeoutMs);
>                     for (TopicPartition partition : allRecords.partitions()) {
>                         List<ConsumerRecord<Object, Object>> partitionRecords 
> = allRecords
>                             .records(partition);
>                         for (ConsumerRecord<Object, Object> record : 
> partitionRecords) {
>                             if (LOG.isTraceEnabled()) {
>                                 LOG.trace("partition = {}, offset = {}, key = 
> {}, value = {}", record.partition(), record.offset(), record.key(), 
> record.value());
>                             }
>                             Exchange exchange = 
> endpoint.createKafkaExchange(record);
>                             try {
>                                 processor.process(exchange);
>                             } catch (Exception e) {
>                                 getExceptionHandler().handleException("Error 
> during processing", exchange, e);
>                             }
>                         }
>                         // if autocommit is false
>                         if (endpoint.getConfiguration().isAutoCommitEnable() 
> != null
>                             && 
> !endpoint.getConfiguration().isAutoCommitEnable()) {
>                             long partitionLastoffset = 
> partitionRecords.get(partitionRecords.size() - 1).offset();
>                             consumer.commitSync(Collections.singletonMap(
>                                 partition, new 
> OffsetAndMetadata(partitionLastoffset + 1)));
>                         }
>                     }
>                 }
> {code}
> Any insights are appreciated.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to