Roger created CAMEL-11215:
-----------------------------
Summary: Camel Kafka component commits offsets in case of
exceptions
Key: CAMEL-11215
URL: https://issues.apache.org/jira/browse/CAMEL-11215
Project: Camel
Issue Type: Bug
Components: camel-kafka
Affects Versions: 2.18.3
Reporter: Roger
My processor in the router throws an exception but the Kafka component still
commits the offsets.
My route: (heavily redacted and modified)
from( "kafka://blah-blah" ).routeId("MyRoute")
.convertBodyTo( MyData.class )
.process( "MyProcessor" )
.to( "DestinationProcessor" );
The exception I get:
at com.mycompany.MyProcessor.process(MyProcessor.java:152)
at
org.apache.camel.impl.ProcessorEndpoint.onExchange(ProcessorEndpoint.java:103)
at
org.apache.camel.impl.ProcessorEndpoint$1.process(ProcessorEndpoint.java:71)
at
org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145)
at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
at
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
at
org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:117)
at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:542)
at
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
at
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:197)
at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:97)
at
org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:140)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: Exception occurred while getting connection:
oracle.ucp.UniversalConnectionPoolException: Cannot get Connection from
Datasource: java.sql.SQLException: Listener refused the connection with the
following error:
ORA-12514, TNS:listener does not currently know of service requested in connect
descriptor
Here is the corresponding Kafka component code:(KafkaConsumer.java) -This part
of the code does not seem to handle the exception. The exception handler simply
eats up the exception and the fall through code happily commits the offsets. Is
this a bug? or am I missing something?
while (isRunAllowed() && !isStoppingOrStopped() && !isSuspendingOrSuspended()) {
ConsumerRecords<Object, Object> allRecords =
consumer.poll(pollTimeoutMs);
for (TopicPartition partition : allRecords.partitions()) {
List<ConsumerRecord<Object, Object>> partitionRecords =
allRecords
.records(partition);
for (ConsumerRecord<Object, Object> record :
partitionRecords) {
if (LOG.isTraceEnabled()) {
LOG.trace("partition = {}, offset = {}, key =
{}, value = {}", record.partition(), record.offset(), record.key(),
record.value());
}
Exchange exchange =
endpoint.createKafkaExchange(record);
try {
processor.process(exchange);
} catch (Exception e) {
getExceptionHandler().handleException("Error
during processing", exchange, e);
}
}
// if autocommit is false
if (endpoint.getConfiguration().isAutoCommitEnable() !=
null
&&
!endpoint.getConfiguration().isAutoCommitEnable()) {
long partitionLastoffset =
partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(
partition, new
OffsetAndMetadata(partitionLastoffset + 1)));
}
}
}
Any insights are appreciated.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)