[ 
https://issues.apache.org/jira/browse/KAFKA-14102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17616014#comment-17616014
 ] 

Greg Harris commented on KAFKA-14102:
-------------------------------------

>From a cursory investigation, this appears to be a pitfall of using 
>dynamically-loaded security providers in a multi-classloader environment, and 
>does not seem specific to the kafka clients.
I also don't immediately see any way to replace the Sasl/Security classes with 
classloader-aware implementations, though I have a feeling it's possible.

There are a couple of different workarounds that you may consider trying on 
your end, if you haven't already found a workaround:
 * Disable co-location of apps which use Kafka inside the JVM
 * Move the Kafka Client libraries into a single shared ClassLoader for all of 
the apps
 * Repackage the authentication.* classes into a different jar and move those 
to a single shared classloader for all of the apps

I'll remove the KafkaConnect label, as this affects all users of clients, and 
not Connect in any way particularly.

> (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first 
> started app can consume messages
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14102
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14102
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, KafkaConnect
>    Affects Versions: 3.0.1
>            Reporter: Shuo Chen
>            Priority: Blocker
>
> We have 2 web applications (A and B) will consume messages from the same 
> Kafka Server,  so they have the same configurations:
> {code:java}
> security.protocol=SASL_SSL
> sasl.mechanism=OAUTHBEARER
> sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
>  required; 
> sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler
> jaas.enabled=true{code}
>  
> A and B deployed together in one Tomcat server (means they are in JVM 
> process), startup  sequential is A -> B,  then we find B cannot consume the 
> message with following exception:
> {code:java}
> [2022-07-22 02:52:45,184] [ INFO] 6 
> [org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1] 
> o.a.k.c.n.SaslChannelBuilder             -  - [Consumer 
> clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1, 
> groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to 
> org.apache.kafka.common.errors.SaslAuthenticationException: Failed to 
> configure SaslClientAuthenticator
> Caused by: java.lang.IllegalArgumentException: Callback handler must be 
> castable to 
> org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
> at 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
>  ~[kafka-clients-3.0.1.jar:?]
> at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
>  ~[kafka-clients-3.0.1.jar:?]
> ... suppressed 2 lines
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:206)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:143) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) 
> ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.common.network.Selector.connect(Selector.java:256) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1522)
>  ~[spring-kafka-2.8.6.jar:2.8.6]
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1512)
>  ~[spring-kafka-2.8.6.jar:2.8.6]
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1340)
>  ~[spring-kafka-2.8.6.jar:2.8.6]
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252)
>  ~[spring-kafka-2.8.6.jar:2.8.6]{code}
>  
> I have also debugged into the issue and found the key error is: 
> {code:java}
> Caused by: java.lang.IllegalArgumentException: Callback handler must be 
> castable to 
> org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
>  {code}
> Application A start up first, it register OAuthBearerSaslClientProvider in 
> the static block of OAuthBearerLoginModule class, So the 
> OAuthBearerSaslClientProvider is loaded and initialized by 
> ParallelWebappClassLoader for Application A.
> {code:java}
> static {
>     OAuthBearerSaslClientProvider.initialize(); // not part of public API
>     OAuthBearerSaslServerProvider.initialize(); // not part of public API
> } {code}
> Application B then start up, it try to register OAuthBearerSaslClientProvider 
> again with same code, but it won't have any effect because the provider was 
> already registered by Application A.
> When application B try to create the SaslClient, AuthenticateCallbackHandler 
> class was loaded from the one for Application A , while 
> OAuthBearerSaslClientCallbackHandler class was loaded from the one for 
> Application B.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to