[
https://issues.apache.org/jira/browse/CAMEL-11215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Claus Ibsen updated CAMEL-11215:
--------------------------------
Fix Version/s: 2.20.0
2.19.1
2.18.4
> Camel Kafka component commits offsets in case of exceptions
> -----------------------------------------------------------
>
> Key: CAMEL-11215
> URL: https://issues.apache.org/jira/browse/CAMEL-11215
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 2.18.3
> Reporter: Roger
> Assignee: Claus Ibsen
> Fix For: 2.18.4, 2.19.1, 2.20.0
>
>
> My processor in the router throws an exception but the Kafka component still
> commits the offsets.
> My route: (heavily redacted and modified)
> {code:title=Route|borderStyle=solid}
> from( "kafka://blah-blah" ).routeId("MyRoute")
> .convertBodyTo( MyData.class )
> .process( "MyProcessor" )
> .to( "DestinationProcessor" );
> {code}
> The exception I get:
> {code:title=Exception|borderStyle=solid}
> at com.mycompany.MyProcessor.process(MyProcessor.java:152)
> at
> org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:103)
> at
> org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71)
> at
> org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
> at
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145)
> at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
> at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
> at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
> at
> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:117)
> at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
> at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
> at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
> at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
> at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
> at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
> at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
> at
> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:140)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.sql.SQLException: Exception occurred while getting
> connection: oracle.ucp.UniversalConnectionPoolException: Cannot get
> Connection from Datasource: java.sql.SQLException: Listener refused the
> connection with the following error:
> ORA-12514, TNS:listener does not currently know of service requested in
> connect descriptor
> {code}
> Here is the corresponding Kafka component code:(KafkaConsumer.java) -This
> part of the code does not seem to handle the exception. The exception handler
> simply eats up the exception and the fall through code happily commits the
> offsets. Is this a bug? or am I missing something?
> {code:title=KafkaConsumer.java|borderStyle=solid}
> while (isRunAllowed() && !isStoppingOrStopped() &&
> !isSuspendingOrSuspended()) {
> ConsumerRecords<Object, Object> allRecords =
> consumer.poll(pollTimeoutMs);
> for (TopicPartition partition : allRecords.partitions()) {
> List<ConsumerRecord<Object, Object>> partitionRecords
> = allRecords
> .records(partition);
> for (ConsumerRecord<Object, Object> record :
> partitionRecords) {
> if (LOG.isTraceEnabled()) {
> LOG.trace("partition = {}, offset = {}, key =
> {}, value = {}", record.partition(), record.offset(), record.key(),
> record.value());
> }
> Exchange exchange =
> endpoint.createKafkaExchange(record);
> try {
> processor.process(exchange);
> } catch (Exception e) {
> getExceptionHandler().handleException("Error
> during processing", exchange, e);
> }
> }
> // if autocommit is false
> if (endpoint.getConfiguration().isAutoCommitEnable()
> != null
> &&
> !endpoint.getConfiguration().isAutoCommitEnable()) {
> long partitionLastoffset =
> partitionRecords.get(partitionRecords.size() - 1).offset();
> consumer.commitSync(Collections.singletonMap(
> partition, new
> OffsetAndMetadata(partitionLastoffset + 1)));
> }
> }
> }
> {code}
> Any insights are appreciated.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)