[ 
https://issues.apache.org/jira/browse/KAFKA-14102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuo Chen updated KAFKA-14102:
------------------------------
    Description: 
We have 2 web applications (A and B) will consume messages from the same Kafka 
Server,  so they have the same configurations:

 
{code:java}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
 required; 
sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler 
jaas.enabled=true{code}
 

 

A and B deployed together in one Tomcat server (means they are in JVM process), 
startup  sequential is A -> B,  then we find B cannot consume the message with 
following exception:

 
{code:java}
[2022-07-22 02:52:45,184] [ INFO] 6 
[org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1] 
o.a.k.c.n.SaslChannelBuilder             -  - [Consumer 
clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1, 
groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to 
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure 
SaslClientAuthenticator
Caused by: java.lang.IllegalArgumentException: Callback handler must be 
castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
at 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
 ~[kafka-clients-3.0.1.jar:?]
at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
 ~[kafka-clients-3.0.1.jar:?]
... suppressed 2 lines
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:206)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:143) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) 
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:256) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981) 
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) 
~[kafka-clients-3.0.1.jar:?]
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1522)
 ~[spring-kafka-2.8.6.jar:2.8.6]
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1512)
 ~[spring-kafka-2.8.6.jar:2.8.6]
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1340)
 ~[spring-kafka-2.8.6.jar:2.8.6]
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252)
 ~[spring-kafka-2.8.6.jar:2.8.6]{code}
 

 

 

 

 

  was:
We have 2 web applications (A and B) will consume messages from the same Kafka 
Server,  so they have the same configurations:

 
{code:java}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
 required; 
sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler 
jaas.enabled=true{code}
 

 

A and B deployed together in one Tomcat server (means they are in JVM process), 
startup  sequential is A -> B,  then we find B cannot consume the message with 
following exception:

 
{code:java}
[2022-07-22 02:52:45,184] [ INFO] 6 
[org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1] 
o.a.k.c.n.SaslChannelBuilder             -  - [Consumer 
clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1, 
groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to 
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure 
SaslClientAuthenticator
Caused by: java.lang.IllegalArgumentException: Callback handler must be 
castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
at 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
 ~[kafka-clients-3.0.1.jar:?]
at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
 ~[kafka-clients-3.0.1.jar:?]
... suppressed 2 lines
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:206)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:143) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) 
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:256) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981) 
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040)
 ~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262)
 ~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) 
~[kafka-clients-3.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) 
~[kafka-clients-3.0.1.jar:?]
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1522)
 ~[spring-kafka-2.8.6.jar:2.8.6]
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1512)
 ~[spring-kafka-2.8.6.jar:2.8.6]
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1340)
 ~[spring-kafka-2.8.6.jar:2.8.6]
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252)
 ~[spring-kafka-2.8.6.jar:2.8.6]{code}
 

 

 

 

 


> (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first 
> started app can consume messages
> ------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14102
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14102
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, KafkaConnect
>    Affects Versions: 3.0.1
>            Reporter: Shuo Chen
>            Priority: Major
>
> We have 2 web applications (A and B) will consume messages from the same 
> Kafka Server,  so they have the same configurations:
>  
> {code:java}
> security.protocol=SASL_SSL
> sasl.mechanism=OAUTHBEARER
> sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
>  required; 
> sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler 
> jaas.enabled=true{code}
>  
>  
> A and B deployed together in one Tomcat server (means they are in JVM 
> process), startup  sequential is A -> B,  then we find B cannot consume the 
> message with following exception:
>  
> {code:java}
> [2022-07-22 02:52:45,184] [ INFO] 6 
> [org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1] 
> o.a.k.c.n.SaslChannelBuilder             -  - [Consumer 
> clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1, 
> groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to 
> org.apache.kafka.common.errors.SaslAuthenticationException: Failed to 
> configure SaslClientAuthenticator
> Caused by: java.lang.IllegalArgumentException: Callback handler must be 
> castable to 
> org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
> at 
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
>  ~[kafka-clients-3.0.1.jar:?]
> at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
>  ~[kafka-clients-3.0.1.jar:?]
> ... suppressed 2 lines
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:206)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:143) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329) 
> ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.common.network.Selector.connect(Selector.java:256) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040)
>  ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262)
>  ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) 
> ~[kafka-clients-3.0.1.jar:?]
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1522)
>  ~[spring-kafka-2.8.6.jar:2.8.6]
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1512)
>  ~[spring-kafka-2.8.6.jar:2.8.6]
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1340)
>  ~[spring-kafka-2.8.6.jar:2.8.6]
> at 
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252)
>  ~[spring-kafka-2.8.6.jar:2.8.6]{code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to