[
https://issues.apache.org/jira/browse/KAFKA-14102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Shuo Chen updated KAFKA-14102:
------------------------------
Description:
We have 2 web applications (A and B) will consume messages from the same Kafka
Server, so they have the same configurations:
{code:java}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
required;
sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler
jaas.enabled=true{code}
A and B deployed together in one Tomcat server (means they are in JVM process),
startup sequential is A -> B, then we find B cannot consume the message with
following exception:
{code:java}
[2022-07-22 02:52:45,184] [ INFO] 6
[org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1]
o.a.k.c.n.SaslChannelBuilder - - [Consumer
clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1,
groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure
SaslClientAuthenticator
Caused by: java.lang.IllegalArgumentException: Callback handler must be
castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler:
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
at
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
~[kafka-clients-3.0.1.jar:?]
at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
at
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
~[kafka-clients-3.0.1.jar:?]
... suppressed 2 lines
at
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:206)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:143)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329)
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:256)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981)
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040)
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
~[kafka-clients-3.0.1.jar:?]
at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1522)
~[spring-kafka-2.8.6.jar:2.8.6]
at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1512)
~[spring-kafka-2.8.6.jar:2.8.6]
at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1340)
~[spring-kafka-2.8.6.jar:2.8.6]
at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252)
~[spring-kafka-2.8.6.jar:2.8.6]{code}
was:
We have 2 web applications (A and B) will consume messages from the same Kafka
Server, so they have the same configurations:
{code:java}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
required;
sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler
jaas.enabled=true{code}
A and B deployed together in one Tomcat server (means they are in JVM process),
startup sequential is A -> B, then we find B cannot consume the message with
following exception:
{code:java}
[2022-07-22 02:52:45,184] [ INFO] 6
[org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1]
o.a.k.c.n.SaslChannelBuilder - - [Consumer
clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1,
groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to
org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure
SaslClientAuthenticator
Caused by: java.lang.IllegalArgumentException: Callback handler must be
castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler:
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
at
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
~[kafka-clients-3.0.1.jar:?]
at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
at
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
~[kafka-clients-3.0.1.jar:?]
... suppressed 2 lines
at
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:206)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:143)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329)
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.common.network.Selector.connect(Selector.java:256)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981)
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040)
~[kafka-clients-3.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
~[kafka-clients-3.0.1.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
~[kafka-clients-3.0.1.jar:?]
at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1522)
~[spring-kafka-2.8.6.jar:2.8.6]
at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1512)
~[spring-kafka-2.8.6.jar:2.8.6]
at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1340)
~[spring-kafka-2.8.6.jar:2.8.6]
at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252)
~[spring-kafka-2.8.6.jar:2.8.6]{code}
> (SASL/OAUTHBEARER) multiple applications in one JVM process, only the first
> started app can consume messages
> ------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-14102
> URL: https://issues.apache.org/jira/browse/KAFKA-14102
> Project: Kafka
> Issue Type: Bug
> Components: clients, KafkaConnect
> Affects Versions: 3.0.1
> Reporter: Shuo Chen
> Priority: Major
>
> We have 2 web applications (A and B) will consume messages from the same
> Kafka Server, so they have the same configurations:
>
> {code:java}
> security.protocol=SASL_SSL
> sasl.mechanism=OAUTHBEARER
> sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
> required;
> sasl.login.callback.handler.class=MyOauth2AuthenticateCallbackHandler
> jaas.enabled=true{code}
>
>
> A and B deployed together in one Tomcat server (means they are in JVM
> process), startup sequential is A -> B, then we find B cannot consume the
> message with following exception:
>
> {code:java}
> [2022-07-22 02:52:45,184] [ INFO] 6
> [org.springframework.kafka.KafkaListenerEndpointContainer#5-0-C-1]
> o.a.k.c.n.SaslChannelBuilder - - [Consumer
> clientId=consumer-XXX-7d8650290c70c1fc3da6305099bde64c-1,
> groupId=XXX-7d8650290c70c1fc3da6305099bde64c] Failed to create channel due to
> org.apache.kafka.common.errors.SaslAuthenticationException: Failed to
> configure SaslClientAuthenticator
> Caused by: java.lang.IllegalArgumentException: Callback handler must be
> castable to
> org.apache.kafka.common.security.auth.AuthenticateCallbackHandler:
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler
> at
> org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient$OAuthBearerSaslClientFactory.createSaslClient(OAuthBearerSaslClient.java:182)
> ~[kafka-clients-3.0.1.jar:?]
> at javax.security.sasl.Sasl.createSaslClient(Sasl.java:420) ~[?:1.8.0_332]
> at
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:219)
> ~[kafka-clients-3.0.1.jar:?]
> ... suppressed 2 lines
> at
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:215)
> ~[kafka-clients-3.0.1.jar:?]
> at
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.<init>(SaslClientAuthenticator.java:206)
> ~[kafka-clients-3.0.1.jar:?]
> at
> org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:286)
> ~[kafka-clients-3.0.1.jar:?]
> at
> org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:228)
> ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.common.network.KafkaChannel.<init>(KafkaChannel.java:143)
> ~[kafka-clients-3.0.1.jar:?]
> at
> org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:236)
> ~[kafka-clients-3.0.1.jar:?]
> at
> org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:338)
> ~[kafka-clients-3.0.1.jar:?]
> at
> org.apache.kafka.common.network.Selector.registerChannel(Selector.java:329)
> ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.common.network.Selector.connect(Selector.java:256)
> ~[kafka-clients-3.0.1.jar:?]
> at
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:981)
> ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73)
> ~[kafka-clients-3.0.1.jar:?]
> at
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1152)
> ~[kafka-clients-3.0.1.jar:?]
> at
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1040)
> ~[kafka-clients-3.0.1.jar:?]
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549)
> ~[kafka-clients-3.0.1.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
> ~[kafka-clients-3.0.1.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
> ~[kafka-clients-3.0.1.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
> ~[kafka-clients-3.0.1.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
> ~[kafka-clients-3.0.1.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258)
> ~[kafka-clients-3.0.1.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483)
> ~[kafka-clients-3.0.1.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262)
> ~[kafka-clients-3.0.1.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
> ~[kafka-clients-3.0.1.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
> ~[kafka-clients-3.0.1.jar:?]
> at
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1522)
> ~[spring-kafka-2.8.6.jar:2.8.6]
> at
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1512)
> ~[spring-kafka-2.8.6.jar:2.8.6]
> at
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1340)
> ~[spring-kafka-2.8.6.jar:2.8.6]
> at
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252)
> ~[spring-kafka-2.8.6.jar:2.8.6]{code}
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)