Hi ,

I have set up a Kafka cluster on my linux machine secured using keycloak 
(OAUTHBEARER) Mechanism. I can use the Kafka Console Consumers and Producers to 
send and receive messages.

I have tried to connect to Kafka from my consumers and producers deployed as 
module on the wildfly App server . I have set up all the required configuration 
(Config Section below)
The SASL_JAAS_CONFIG has the details like 
(apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required 
LoginStringClaim_sub='kafka-client');

I am able to get authenticated with the broker , but in the client callback I 
am getting an Unsupported Callback error . I have 3 modules in wildfly
1) kafka producer consumer code dependent on the 2) oauth jar (for 
logincallbackhandler and login module)  dependent on the 3) kafka-client jar 
(2.8.0)]

I can see that the THE CLIENT CALL BACK IS CLIENTCREDENTIAL INSTEAD OF 
OAuthBearerTokenCallback. The saslclient is getting set as AbstractSaslClient 
instead of OAuthBearerSaslClient.

Can I get any pointers on this one ?

LOGS

rg.apache.kafka.common.errors.SaslAuthenticationException: An error: 
(java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
ELY05176: Unsupported callback [Caused by 
javax.security.auth.callback.UnsupportedCallbackException]) occurred when 
evaluating SASL token received from the Kafka Broker. Kafka Client will go to 
AUTHENTICATION_FAILED state.
Caused by: javax.security.sasl.SaslException: ELY05176: Unsupported callback 
[Caused by javax.security.auth.callback.UnsupportedCallbackException]
                at 
org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.mechanism.oauth2.OAuth2Client.getInitialResponse(OAuth2Client.java:58)<mailto:org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.mechanism.oauth2.OAuth2Client.getInitialResponse(OAuth2Client.java:58)>
                at 
org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.sasl.oauth2.OAuth2SaslClient.evaluateMessage(OAuth2SaslClient.java:62)<mailto:org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.sasl.oauth2.OAuth2SaslClient.evaluateMessage(OAuth2SaslClient.java:62)>
                at 
org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.sasl.util.AbstractSaslParticipant.evaluateMessage(AbstractSaslParticipant.java:219)<mailto:org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.sasl.util.AbstractSaslParticipant.evaluateMessage(AbstractSaslParticipant.java:219)>
                at 
org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.sasl.util.AbstractSaslClient.evaluateChallenge(AbstractSaslClient.java:98)<mailto:org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.sasl.util.AbstractSaslClient.evaluateChallenge(AbstractSaslClient.java:98)>
                at 
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:534)<mailto:org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:534)>
                at java.base/java.security.AccessController.doPrivileged(Native 
Method)
                at java.base/javax.security.auth.Subject.doAs(Subject.java:423)
                at 
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:534)<mailto:org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:534)>
                at 
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:433)<mailto:org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:433)>
                at 
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:332)<mailto:org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:332)>
                at 
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:273)<mailto:org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:273)>
                at 
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)<mailto:org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)>
                at 
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)<mailto:org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)>
                at 
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.network.Selector.poll(Selector.java:481)<mailto:org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.network.Selector.poll(Selector.java:481)>
                at 
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)<mailto:org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)>
                at 
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)<mailto:org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)>
                at 
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)<mailto:org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)>
                at 
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)<mailto:org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)>
                at 
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245)<mailto:org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245)>
                at 
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480)<mailto:org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480)>
                at 
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261)<mailto:org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261)>
                at 
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)<mailto:org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)>
                at 
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)<mailto:org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)>
                at 
com.lgc.common.core//com.lgc.dsl.notifications.consumer.DataChangeNoticeKafkaConsumer.poll(DataChangeNoticeKafkaConsumer.java:388)
                at 
com.lgc.common.core//com.lgc.dsds.notifications.producer.DataChangeNotificationProducer.consumeNotification(DataChangeNotificationProducer.java:204)
                at 
com.lgc.common.core//com.lgc.dsds.notifications.producer.DataChangeNotificationProducer.retrieveAndProcessNotificationObject(DataChangeNotificationProducer.java:106)
                at 
com.lgc.common.core//com.lgc.dsds.notifications.producer.DataChangeNotificationProducer.run(DataChangeNotificationProducer.java:75)
                at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: javax.security.auth.callback.UnsupportedCallbackException
                at 
com.lgc.common.koauth//com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler.handle(AuthOBearerSaslClientCallbackHandler.java:91)
                at 
org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.mechanism._private.MechanismUtil.handleCallbacks(MechanismUtil.java:156)<mailto:org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.mechanism._private.MechanismUtil.handleCallbacks(MechanismUtil.java:156)>
                at 
org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.mechanism.oauth2.OAuth2Client.getInitialResponse(OAuth2Client.java:56)<mailto:org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.mechanism.oauth2.OAuth2Client.getInitialResponse(OAuth2Client.java:56)>
                ... 27 more


LOGS WHERE THE SSL HANDSHAKE IS SUCCESSFUL ,  THE CLIENT CALL BACK IS 
CLIENTCREDENTIAL INSTEAD OF OAuthBearerTokenCallback. The saslclient is getting 
set as AbstractSaslClient instead of OAuthBearerSaslClient

21-08-29 16:21:25,756 DEBUG [io.undertow.request] (management I/O-1) Upgrading 
request HttpServerExchange{ GET /}
2021-08-29 16:21:25,760 DEBUG 
[org.apache.kafka.common.network.SslTransportLayer] (OWNotificationProducer) 
[SslTransportLayer channelId=-1 
key=channel=java.nio.channels.SocketChannel[connection-pending 
remote=i-10-134-194-96/10.134.194.96:9093], 
selector=sun.nio.ch.EPollSelectorImpl@50326a63<mailto:selector=sun.nio.ch.EPollSelectorImpl@50326a63>,
 interestOps=8, readyOps=0] SSL handshake completed successfully with peerHost 
'i-10-134-194-96' peerPort 9093 peerPrincipal 'CN=i-10-134-194-96, OU=Foo, 
O=acme corp, L=Duckburg, ST=Duckburg, C=WD' cipherSuite 
'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384'
2021-08-29 16:21:25,765 DEBUG 
[org.apache.kafka.common.security.authenticator.SaslClientAuthenticator] 
(OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4, 
groupId=OpenWorksConsumer] Set SASL client state to RECEIVE_APIVERSIONS_RESPONSE
2021-08-29 16:21:25,766 DEBUG 
[org.apache.kafka.common.security.authenticator.SaslClientAuthenticator] 
(OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4, 
groupId=OpenWorksConsumer] Set SASL client state to SEND_HANDSHAKE_REQUEST
2021-08-29 16:21:25,767 DEBUG [org.apache.kafka.clients.NetworkClient] 
(kafka-producer-network-thread | CommonKafkaProducer) [Producer 
clientId=CommonKafkaProducer] Give up sending metadata request since no node is 
available
2021-08-29 16:21:25,767 DEBUG 
[org.apache.kafka.common.security.authenticator.SaslClientAuthenticator] 
(OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4, 
groupId=OpenWorksConsumer] Set SASL client state to RECEIVE_HANDSHAKE_RESPONSE
2021-08-29 16:21:25,768 DEBUG 
[org.apache.kafka.common.security.authenticator.SaslClientAuthenticator] 
(OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4, 
groupId=OpenWorksConsumer] Set SASL client state to INITIAL
2021-08-29 16:21:25,769 INFO  
[com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler] 
(OWNotificationProducer) The class loaders are as follows ************ 
Callbackclienthandler class 
com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler 
ModuleClassLoader for Module "com.lgc.common.koauth" from local module loader 
@6253c26 (finder: local module finder @49049a04 (roots: 
/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/dv,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/base,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/add-ons/keycloak))
2021-08-29 16:21:25,778 INFO  
[com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler] 
(OWNotificationProducer) The class loaders are as follows ************ 
OAuthBearerTokenCallback class 
org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback 
ModuleClassLoader for Module "org.apache.kafka.clients" version 1.1.8.1 from 
local module loader @6253c26 (finder: local module finder @49049a04 (roots: 
/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/dv,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/base,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/add-ons/keycloak))
2021-08-29 16:21:25,787 INFO  
[com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler] 
(OWNotificationProducer) The class loaders are as follows ************ 
OAuthBearerLoginModule class 
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule 
ModuleClassLoader for Module "org.apache.kafka.clients" version 1.1.8.1 from 
local module loader @6253c26 (finder: local module finder @49049a04 (roots: 
/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/dv,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/base,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/add-ons/keycloak))
2021-08-29 16:21:25,800 DEBUG 
[org.jboss.jca.core.connectionmanager.pool.validator.ConnectionValidator] 
(ConnectionValidator) Notifying pools, interval: 500
2021-08-29 16:21:25,800 DEBUG 
[org.jboss.jca.core.connectionmanager.pool.strategy.PoolBySubject] 
(ConnectionValidator) Checking for connection within frequency
2021-08-29 16:21:25,796 INFO  
[com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler] 
(OWNotificationProducer) ******************* The callbacls are 
org.wildfly.security.auth.callback.CredentialCallback@2b29cf23<mailto:org.wildfly.security.auth.callback.CredentialCallback@2b29cf23>
 ClassName class 
org.wildfly.security.auth.callback.CredentialCallbackModuleClassLoader for 
Module "org.wildfly.security.elytron-private" version 1.11.4.Final from local 
module loader @6253c26 (finder: local module finder @49049a04 (roots: 
/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/dv,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/base,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/add-ons/keycloak))
2021-08-29 16:21:25,803 DEBUG 
[org.jboss.jca.core.connectionmanager.pool.strategy.PoolBySubject] 
(ConnectionValidator) Returning for connection within frequency
2021-08-29 16:21:25,803 DEBUG 
[org.jboss.jca.core.connectionmanager.pool.strategy.PoolBySubject] 
(ConnectionValidator) Checking for connection within frequency
2021-08-29 16:21:25,805 INFO  [org.apache.kafka.common.network.Selector] 
(OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4, 
groupId=OpenWorksConsumer] Failed authentication with 
i-10-134-194-96/10.134.194.96 (An error: 
(java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
ELY05176: Unsupported callback [Caused by 
javax.security.auth.callback.UnsupportedCallbackException]) occurred when 
evaluating SASL token received from the Kafka Broker. Kafka Client will go to 
AUTHENTICATION_FAILED state.)
2021-08-29 16:21:25,809 DEBUG [org.apache.kafka.clients.NetworkClient] 
(OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4, 
groupId=OpenWorksConsumer] Node -1 disconnected.


Config

2021-08-29 16:21:25,119 INFO  
[org.apache.kafka.clients.consumer.ConsumerConfig] (OWNotificationProducer) 
ConsumerConfig values:
                allow.auto.create.topics = true
                auto.commit.interval.ms = 5000
                auto.offset.reset = latest
                bootstrap.servers = [i-10-134-194-96:9093]
                check.crcs = true
                client.dns.lookup = use_all_dns_ips
                client.id = consumer-OpenWorksConsumer-2
                client.rack =
                connections.max.idle.ms = 540000
                default.api.timeout.ms = 60000
                enable.auto.commit = false
                exclude.internal.topics = true
                fetch.max.bytes = 52428800
                fetch.max.wait.ms = 500
                fetch.min.bytes = 1
                group.id = OpenWorksConsumer
                group.instance.id = null
                heartbeat.interval.ms = 3000
                interceptor.classes = []
                internal.leave.group.on.close = true
                internal.throw.on.fetch.stable.offset.unsupported = false
                isolation.level = read_uncommitted
                key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
                max.partition.fetch.bytes = 1048576
                max.poll.interval.ms = 330000
                max.poll.records = 100
                metadata.max.age.ms = 300000
                metric.reporters = []
                metrics.num.samples = 2
                metrics.recording.level = INFO
                metrics.sample.window.ms = 30000
                partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
                receive.buffer.bytes = 65536
                reconnect.backoff.max.ms = 1000
                reconnect.backoff.ms = 50
                request.timeout.ms = 30000
                retry.backoff.ms = 100
                sasl.client.callback.handler.class = class 
com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler
                sasl.jaas.config = [hidden]
                sasl.kerberos.kinit.cmd = /usr/bin/kinit
                sasl.kerberos.min.time.before.relogin = 60000
                sasl.kerberos.service.name = null
                sasl.kerberos.ticket.renew.jitter = 0.05
                sasl.kerberos.ticket.renew.window.factor = 0.8
                sasl.login.callback.handler.class = class 
com.oauth2.security.oauthbearer.OAuthAuthenticateLoginCallbackHandler
                sasl.login.class = null
                sasl.login.refresh.buffer.seconds = 300
                sasl.login.refresh.min.period.seconds = 60
                sasl.login.refresh.window.factor = 0.8
                sasl.login.refresh.window.jitter = 0.05
                sasl.mechanism = OAUTHBEARER
                security.protocol = SASL_SSL
                security.providers = null
                send.buffer.bytes = 131072
                session.timeout.ms = 60000
                socket.connection.setup.timeout.max.ms = 30000
                socket.connection.setup.timeout.ms = 10000
                ssl.cipher.suites = null
                ssl.enabled.protocols = [TLSv1.2]
                ssl.endpoint.identification.algorithm =
                ssl.engine.factory.class = null
                ssl.key.password = null
                ssl.keymanager.algorithm = SunX509
                ssl.keystore.certificate.chain = null
                ssl.keystore.key = null
                ssl.keystore.location = null
                ssl.keystore.password = null
                ssl.keystore.type = JKS
                ssl.protocol = TLSv1.2
                ssl.provider = null
                ssl.secure.random.implementation = null
                ssl.trustmanager.algorithm = PKIX
                ssl.truststore.certificates = null
                ssl.truststore.location = 
/opt/Landmark/new_certs/securityserver.keystore
                ssl.truststore.password = null
                ssl.truststore.type = JKS
                value.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer

Regards,
Shankar

Reply via email to