[
https://issues.apache.org/jira/browse/CAMEL-18796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Claus Ibsen resolved CAMEL-18796.
---------------------------------
Resolution: Fixed
> camel-kafka: kafka consumer stops in case of an authentication issue
> --------------------------------------------------------------------
>
> Key: CAMEL-18796
> URL: https://issues.apache.org/jira/browse/CAMEL-18796
> Project: Camel
> Issue Type: Bug
> Components: camel-kafka
> Affects Versions: 3.18.0, 3.19.0
> Reporter: Luca Burgazzoli
> Assignee: Otavio Rodolfo Piske
> Priority: Major
> Fix For: 3.18.5, 3.20.0
>
>
> I'm running in a strange behavior of the camle-kafka component in case of a
> glitch/temporary authentication issue. Assuming we have the following code:
> {code:java}
> //usr/bin/env jbang "$0" "$@" ; exit $?
> //
> //DEPS io.quarkus.platform:quarkus-camel-bom:2.14.2.Final@pom
> //DEPS org.apache.camel.quarkus:camel-quarkus-kafka
> //DEPS org.apache.camel.quarkus:camel-quarkus-log
> //DEPS org.apache.camel.quarkus:camel-quarkus-direct
> //
> //JAVAC_OPTIONS -parameters
> //JAVA_OPTIONS -Djava.util.logging.manager=org.jboss.logmanager.LogManager
> //
> import org.apache.camel.ExtendedCamelContext;
> import org.apache.camel.builder.endpoint.EndpointRouteBuilder;
> public class ck extends EndpointRouteBuilder {
> @Override
> public void configure() throws Exception {
> getCamelContext().adapt(ExtendedCamelContext.class)
> .setErrorHandlerFactory(
> deadLetterChannel("direct:dlq")
> );
> var kafka = kafka("demo")
> .brokers("{{test.kafka.broker}}")
> .autoOffsetReset("earliest")
> .securityProtocol("SASL_SSL")
> .saslMechanism("PLAIN")
>
> .saslJaasConfig("org.apache.kafka.common.security.plain.PlainLoginModule
> required username='{{test.kafka.username}}'
> password='{{test.kafka.password}}';");
> from("direct:dlq")
> .to("log:dlq?showAll=true&multiline=true");
> from(kafka)
> .to("log:kafka?showAll=true&multiline=true");
> }
> }
> {code}
> What this route is doing is:
> 1. set-up a global error handler (send to a DLQ)
> 2. poll data from a kafka topic
> If for some reason there is a glitch in the authentication machinery, then
> the KafkaConsumer thread is terminated and no more poll/reconnection attempt
> are made.
> {code}
> 2022-12-05 21:52:48,728 DEBUG
> [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1)
> thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted
> on 0 records to process
> 2022-12-05 21:52:53,729 DEBUG
> [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1)
> thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted
> on 0 records to process
> 2022-12-05 21:52:58,730 DEBUG
> [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1)
> thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted
> on 0 records to process
> 2022-12-05 21:53:03,731 DEBUG
> [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1)
> thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted
> on 0 records to process
> 2022-12-05 21:53:08,732 DEBUG
> [org.apa.cam.com.kaf.con.sup.KafkaRecordProcessorFacade] (Camel (camel-1)
> thread #1 - KafkaConsumer[demo]) Last poll on thread demo-Thread 0 resulted
> on 0 records to process
> 2022-12-05 21:53:09,598 INFO [org.apa.kaf.com.net.Selector] (Camel (camel-1)
> thread #1 - KafkaConsumer[demo]) [Consumer
> clientId=consumer-9fc21222-980b-4dd7-8e2b-0a228a4f3fe5-1,
> groupId=9fc21222-980b-4dd7-8e2b-0a228a4f3fe5] Failed re-authentication with
> broker-0-lb-cos-ce---l--votu-g----ig.bf2.kafka.rhcloud.com/34.247.249.77
> (channelId=2147483647) (Authentication failed: credentials for user could not
> be verified)
> 2022-12-05 21:53:09,602 ERROR [org.apa.kaf.cli.NetworkClient] (Camel
> (camel-1) thread #1 - KafkaConsumer[demo]) [Consumer
> clientId=consumer-9fc21222-980b-4dd7-8e2b-0a228a4f3fe5-1,
> groupId=9fc21222-980b-4dd7-8e2b-0a228a4f3fe5] Connection to node 2147483647
> (broker-0-lb-cos-ce---l--votu-g----ig.bf2.kafka.rhcloud.com/34.247.249.77:443)
> failed authentication due to: Authentication failed: credentials for user
> could not be verified
> 2022-12-05 21:53:09,605 WARN [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel
> (camel-1) thread #1 - KafkaConsumer[demo]) Exception
> org.apache.kafka.common.errors.SaslAuthenticationException caught by thread
> demo-Thread 0 while polling topic demo from kafka: Authentication failed:
> credentials for user could not be verified:
> org.apache.kafka.common.errors.SaslAuthenticationException: Authentication
> failed: credentials for user could not be verified
> 2022-12-05 21:53:09,609 WARN
> [org.apa.cam.com.kaf.con.err.BridgeErrorStrategy] (Camel (camel-1) thread #1
> - KafkaConsumer[demo]) Deferring processing to the exception handler based on
> polling exception strategy
> 2022-12-05 21:53:09,624 DEBUG [org.apa.cam.pro.err.DeadLetterChannel] (Camel
> (camel-1) thread #1 - KafkaConsumer[demo]) Failed delivery for (MessageId:
> 386B9AF6D607152-0000000000000000 on ExchangeId:
> 386B9AF6D607152-0000000000000000). On delivery attempt: 0 caught:
> org.apache.kafka.common.errors.SaslAuthenticationException: Authentication
> failed: credentials for user could not be verified
> 2022-12-05 21:53:09,628 DEBUG [org.apa.cam.pro.SendProcessor] (Camel
> (camel-1) thread #1 - KafkaConsumer[demo]) >>>> direct://dlq
> Exchange[386B9AF6D607152-0000000000000000]
> 2022-12-05 21:53:09,628 DEBUG [org.apa.cam.pro.SendProcessor] (Camel
> (camel-1) thread #1 - KafkaConsumer[demo]) >>>>
> log://dlq?multiline=true&showAll=true
> Exchange[386B9AF6D607152-0000000000000000]
> 2022-12-05 21:53:09,629 INFO [dlq] (Camel (camel-1) thread #1 -
> KafkaConsumer[demo]) Exchange[
> Id: 386B9AF6D607152-0000000000000000
> ExchangePattern: InOnly
> Properties: {CamelErrorHandlerBridge=true,
> CamelExceptionCaught=org.apache.kafka.common.errors.SaslAuthenticationException:
> Authentication failed: credentials for user could not be verified,
> CamelFailureRouteId=route2, CamelFatalFallbackErrorHandler=[route2],
> CamelToEndpoint=log://dlq?multiline=true&showAll=true}
> Headers: {}
> BodyType: null
> Body: [Body is null]
> CaughtExceptionType:
> org.apache.kafka.common.errors.SaslAuthenticationException
> CaughtExceptionMessage: Authentication failed: credentials for user could not
> be verified StackTrace:
> org.apache.kafka.common.errors.SaslAuthenticationException: Authentication
> failed: credentials for user could not be verified
> ]
> 2022-12-05 21:53:09,635 INFO [org.apa.cam.com.kaf.con.err.SeekUtil] (Camel
> (camel-1) thread #1 - KafkaConsumer[demo]) Consumer seeking to next offset 1
> to continue polling next message from topic demo on partition 0
> 2022-12-05 21:53:09,636 DEBUG [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel
> (camel-1) thread #1 - KafkaConsumer[demo]) Closing consumer demo-Thread 0
> 2022-12-05 21:53:09,636 DEBUG
> [org.apa.cam.com.kaf.con.sup.PartitionAssignmentListener] (Camel (camel-1)
> thread #1 - KafkaConsumer[demo]) onPartitionsRevoked: demo-Thread 0 from demo
> 2022-12-05 21:53:09,643 INFO [org.apa.cam.com.kaf.KafkaFetchRecords] (Camel
> (camel-1) thread #1 - KafkaConsumer[demo]) Terminating KafkaConsumer thread
> demo-Thread 0 receiving from topic demo
> {code}
>
> However according to the documentation, if the pollOnError is set to
> ERROR_HANDLER as in this case (it is the default), the strategy should use
> Camel’s error handler to process the exception, and afterwards continue to
> poll next message but this does not seems to be the case.
> This seems to be somehow related to:
> - https://issues.apache.org/jira/browse/CAMEL-17424
> -
> https://github.com/apache/camel/commit/55df049a96fd8f52265ef7e7a0cc9ca5a28ab6b3
--
This message was sent by Atlassian Jira
(v8.20.10#820010)