[ 
https://issues.apache.org/jira/browse/CAMEL-11215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roger updated CAMEL-11215:
--------------------------
    Description: 
My processor in the router throws an exception but the Kafka component still 
commits the offsets. 

My route: (heavily redacted and modified)
{code:title=Route|borderStyle=solid}
from( "kafka://blah-blah" ).routeId("MyRoute")
                .convertBodyTo( MyData.class )
                .process( "MyProcessor" )
                .to( "DestinationProcessor" );
{code}
The exception I get: 
{code:title=Exception|borderStyle=solid}
        at com.mycompany.MyProcessor.process(MyProcessor.java:152)
        at 
org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:103)
        at 
org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71)
        at 
org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
        at 
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145)
        at 
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
        at 
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
        at 
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
        at 
org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:117)
        at 
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
        at 
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
        at 
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
        at 
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
        at 
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
        at 
org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:140)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: Exception occurred while getting connection: 
oracle.ucp.UniversalConnectionPoolException: Cannot get Connection from 
Datasource: java.sql.SQLException: Listener refused the connection with the 
following error:
ORA-12514, TNS:listener does not currently know of service requested in connect 
descriptor
{code}
Here is the corresponding Kafka component code:(KafkaConsumer.java) -This part 
of the code does not seem to handle the exception. The exception handler simply 
eats up the exception and the fall through code happily commits the offsets. Is 
this a bug? or am I missing something?

{code:title=KafkaConsumer.java|borderStyle=solid}
while (isRunAllowed() && !isStoppingOrStopped() && !isSuspendingOrSuspended()) {
                    ConsumerRecords<Object, Object> allRecords = 
consumer.poll(pollTimeoutMs);
                    for (TopicPartition partition : allRecords.partitions()) {
                        List<ConsumerRecord<Object, Object>> partitionRecords = 
allRecords
                            .records(partition);
                        for (ConsumerRecord<Object, Object> record : 
partitionRecords) {
                            if (LOG.isTraceEnabled()) {
                                LOG.trace("partition = {}, offset = {}, key = 
{}, value = {}", record.partition(), record.offset(), record.key(), 
record.value());
                            }
                            Exchange exchange = 
endpoint.createKafkaExchange(record);
                            try {
                                processor.process(exchange);
                            } catch (Exception e) {
                                getExceptionHandler().handleException("Error 
during processing", exchange, e);
                            }
                        }
                        // if autocommit is false
                        if (endpoint.getConfiguration().isAutoCommitEnable() != 
null
                            && 
!endpoint.getConfiguration().isAutoCommitEnable()) {
                            long partitionLastoffset = 
partitionRecords.get(partitionRecords.size() - 1).offset();
                            consumer.commitSync(Collections.singletonMap(
                                partition, new 
OffsetAndMetadata(partitionLastoffset + 1)));
                        }
                    }
                }
{code}

Any insights are appreciated.


  was:
My processor in the router throws an exception but the Kafka component still 
commits the offsets. 

My route: (heavily redacted and modified)
from( "kafka://blah-blah" ).routeId("MyRoute")
                .convertBodyTo( MyData.class )
                .process( "MyProcessor" )
                .to( "DestinationProcessor" );

The exception I get: 
        at com.mycompany.MyProcessor.process(MyProcessor.java:152)
        at 
org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:103)
        at 
org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71)
        at 
org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
        at 
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145)
        at 
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
        at 
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
        at 
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
        at 
org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:117)
        at 
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
        at 
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
        at 
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
        at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
        at 
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
        at 
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
        at 
org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:140)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: Exception occurred while getting connection: 
oracle.ucp.UniversalConnectionPoolException: Cannot get Connection from 
Datasource: java.sql.SQLException: Listener refused the connection with the 
following error:
ORA-12514, TNS:listener does not currently know of service requested in connect 
descriptor

Here is the corresponding Kafka component code:(KafkaConsumer.java) -This part 
of the code does not seem to handle the exception. The exception handler simply 
eats up the exception and the fall through code happily commits the offsets. Is 
this a bug? or am I missing something?

while (isRunAllowed() && !isStoppingOrStopped() && !isSuspendingOrSuspended()) {
                    ConsumerRecords<Object, Object> allRecords = 
consumer.poll(pollTimeoutMs);
                    for (TopicPartition partition : allRecords.partitions()) {
                        List<ConsumerRecord<Object, Object>> partitionRecords = 
allRecords
                            .records(partition);
                        for (ConsumerRecord<Object, Object> record : 
partitionRecords) {
                            if (LOG.isTraceEnabled()) {
                                LOG.trace("partition = {}, offset = {}, key = 
{}, value = {}", record.partition(), record.offset(), record.key(), 
record.value());
                            }
                            Exchange exchange = 
endpoint.createKafkaExchange(record);
                            try {
                                processor.process(exchange);
                            } catch (Exception e) {
                                getExceptionHandler().handleException("Error 
during processing", exchange, e);
                            }
                        }
                        // if autocommit is false
                        if (endpoint.getConfiguration().isAutoCommitEnable() != 
null
                            && 
!endpoint.getConfiguration().isAutoCommitEnable()) {
                            long partitionLastoffset = 
partitionRecords.get(partitionRecords.size() - 1).offset();
                            consumer.commitSync(Collections.singletonMap(
                                partition, new 
OffsetAndMetadata(partitionLastoffset + 1)));
                        }
                    }
                }

Any insights are appreciated.



> Camel Kafka component commits offsets in case of exceptions
> -----------------------------------------------------------
>
>                 Key: CAMEL-11215
>                 URL: https://issues.apache.org/jira/browse/CAMEL-11215
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-kafka
>    Affects Versions: 2.18.3
>            Reporter: Roger
>
> My processor in the router throws an exception but the Kafka component still 
> commits the offsets. 
> My route: (heavily redacted and modified)
> {code:title=Route|borderStyle=solid}
> from( "kafka://blah-blah" ).routeId("MyRoute")
>                 .convertBodyTo( MyData.class )
>                 .process( "MyProcessor" )
>                 .to( "DestinationProcessor" );
> {code}
> The exception I get: 
> {code:title=Exception|borderStyle=solid}
>         at com.mycompany.MyProcessor.process(MyProcessor.java:152)
>         at 
> org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:103)
>         at 
> org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71)
>         at 
> org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
>         at 
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145)
>         at 
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
>         at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
>         at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
>         at 
> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:117)
>         at 
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
>         at 
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
>         at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
>         at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
>         at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
>         at 
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
>         at 
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
>         at 
> org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:140)
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.sql.SQLException: Exception occurred while getting 
> connection: oracle.ucp.UniversalConnectionPoolException: Cannot get 
> Connection from Datasource: java.sql.SQLException: Listener refused the 
> connection with the following error:
> ORA-12514, TNS:listener does not currently know of service requested in 
> connect descriptor
> {code}
> Here is the corresponding Kafka component code:(KafkaConsumer.java) -This 
> part of the code does not seem to handle the exception. The exception handler 
> simply eats up the exception and the fall through code happily commits the 
> offsets. Is this a bug? or am I missing something?
> {code:title=KafkaConsumer.java|borderStyle=solid}
> while (isRunAllowed() && !isStoppingOrStopped() && 
> !isSuspendingOrSuspended()) {
>                     ConsumerRecords<Object, Object> allRecords = 
> consumer.poll(pollTimeoutMs);
>                     for (TopicPartition partition : allRecords.partitions()) {
>                         List<ConsumerRecord<Object, Object>> partitionRecords 
> = allRecords
>                             .records(partition);
>                         for (ConsumerRecord<Object, Object> record : 
> partitionRecords) {
>                             if (LOG.isTraceEnabled()) {
>                                 LOG.trace("partition = {}, offset = {}, key = 
> {}, value = {}", record.partition(), record.offset(), record.key(), 
> record.value());
>                             }
>                             Exchange exchange = 
> endpoint.createKafkaExchange(record);
>                             try {
>                                 processor.process(exchange);
>                             } catch (Exception e) {
>                                 getExceptionHandler().handleException("Error 
> during processing", exchange, e);
>                             }
>                         }
>                         // if autocommit is false
>                         if (endpoint.getConfiguration().isAutoCommitEnable() 
> != null
>                             && 
> !endpoint.getConfiguration().isAutoCommitEnable()) {
>                             long partitionLastoffset = 
> partitionRecords.get(partitionRecords.size() - 1).offset();
>                             consumer.commitSync(Collections.singletonMap(
>                                 partition, new 
> OffsetAndMetadata(partitionLastoffset + 1)));
>                         }
>                     }
>                 }
> {code}
> Any insights are appreciated.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to