[ https://issues.apache.org/jira/browse/KAFKA-14102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17616014#comment-17616014 ]
Greg Harris commented on KAFKA-14102: ------------------------------------- >From a cursory investigation, this appears to be a pitfall of using >dynamically-loaded security providers in a multi-classloader environment, and >does not seem specific to the kafka clients. I also don't immediately see any way to replace the Sasl/Security classes with classloader-aware implementations, though I have a feeling it's possible. There are a couple of different workarounds that you may consider trying on your end, if you haven't already found a workaround: * Disable co-location of apps which use Kafka inside the JVM * Move the Kafka Client libraries into a single shared ClassLoader for all of the apps * Repackage the authentication.* classes into a different jar and move those to a single shared classloader for all of the apps I'll remove the KafkaConnect label, as this affects all users of clients, and not Connect in any way particularly. > (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first > started app can consume messages > ------------------------------------------------------------------------------------------------------------ > > Key: KAFKA-14102 > URL: https://issues.apache.org/jira/browse/KAFKA-14102 > Project: Kafka > Issue Type: Bug > Components: clients, KafkaConnect > Affects Versions: 3.0.1 > Reporter: Shuo Chen > Priority: Blocker > > We have 2 web applications (A and B) will consume messages from the same > Kafka Server, so they have the same configurations: > {code:java} > security.protocol=SASL_SSL > sasl.mechanism=OAUTHBEARER > sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule > required; > sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler > jaas.enabled=true{code} > > A and B deployed together in one Tomcat server (means they are in JVM > process), startup sequential is A -> B, then we find B cannot consume the > message with following exception: > {code:java} > [2022-07-22 02:52:45,184] [ INFO] 6 > [org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1] > o.a.k.c.n.SaslChannelBuilder - - [Consumer > clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1, > groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to > org.apache.kafka.common.errors.SaslAuthenticationException: Failed to > configure SaslClientAuthenticator > Caused by: java.lang.IllegalArgumentException: Callback handler must be > castable to > org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: > org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler > at > org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182) > ~[kafka-clients-3.0.1.jar:?] > at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332] > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219) > ~[kafka-clients-3.0.1.jar:?] > ... suppressed 2 lines > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215) > ~[kafka-clients-3.0.1.jar:?] > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:206) > ~[kafka-clients-3.0.1.jar:?] > at > org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286) > ~[kafka-clients-3.0.1.jar:?] > at > org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228) > ~[kafka-clients-3.0.1.jar:?] > at org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:143) > ~[kafka-clients-3.0.1.jar:?] > at > org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236) > ~[kafka-clients-3.0.1.jar:?] > at > org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338) > ~[kafka-clients-3.0.1.jar:?] > at > org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) > ~[kafka-clients-3.0.1.jar:?] > at org.apache.kafka.common.network.Selector.connect(Selector.java:256) > ~[kafka-clients-3.0.1.jar:?] > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981) > ~[kafka-clients-3.0.1.jar:?] > at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) > ~[kafka-clients-3.0.1.jar:?] > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152) > ~[kafka-clients-3.0.1.jar:?] > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040) > ~[kafka-clients-3.0.1.jar:?] > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) > ~[kafka-clients-3.0.1.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) > ~[kafka-clients-3.0.1.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > ~[kafka-clients-3.0.1.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227) > ~[kafka-clients-3.0.1.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164) > ~[kafka-clients-3.0.1.jar:?] > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258) > ~[kafka-clients-3.0.1.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483) > ~[kafka-clients-3.0.1.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262) > ~[kafka-clients-3.0.1.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) > ~[kafka-clients-3.0.1.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) > ~[kafka-clients-3.0.1.jar:?] > at > org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1522) > ~[spring-kafka-2.8.6.jar:2.8.6] > at > org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1512) > ~[spring-kafka-2.8.6.jar:2.8.6] > at > org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1340) > ~[spring-kafka-2.8.6.jar:2.8.6] > at > org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252) > ~[spring-kafka-2.8.6.jar:2.8.6]{code} > > I have also debugged into the issue and found the key error is: > {code:java} > Caused by: java.lang.IllegalArgumentException: Callback handler must be > castable to > org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: > org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler > {code} > Application A start up first, it register OAuthBearerSaslClientProvider in > the static block of OAuthBearerLoginModule class, So the > OAuthBearerSaslClientProvider is loaded and initialized by > ParallelWebappClassLoader for Application A. > {code:java} > static { > OAuthBearerSaslClientProvider.initialize(); // not part of public API > OAuthBearerSaslServerProvider.initialize(); // not part of public API > } {code} > Application B then start up, it try to register OAuthBearerSaslClientProvider > again with same code, but it won't have any effect because the provider was > already registered by Application A. > When application B try to create the SaslClient, AuthenticateCallbackHandler > class was loaded from the one for Application A , while > OAuthBearerSaslClientCallbackHandler class was loaded from the one for > Application B. > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)