Hi ,


I have set up a Kafka cluster on my linux machine secured using keycloak
(OAUTHBEARER) Mechanism. I can use the Kafka Console Consumers and
Producers to send and receive messages.



I have tried to connect to Kafka from my consumers and producers deployed
as module on the wildfly App serve (version 19, java 11) . I have set up
all the required configuration (Config Section at the bottom) .


The SASL_JAAS_CONFIG provided as consumerconfig option  has the details
like (apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
required LoginStringClaim_sub='kafka-client');



I am able to get authenticated with the broker , but in the client callback
I am getting an Unsupported Callback error . I have 3 modules in wildfly

1) kafka producer consumer code dependent on the 2) oauth jar (for
logincallbackhandler and login module)  dependent on the 3) kafka-client
jar (2.8.0)]



I can see that the CLIENT CALL BACK IS CLIENTCREDENTIAL INSTEAD OF
OAuthBearerTokenCallback. The saslclient is getting set as
AbstractSaslClient instead of OAuthBearerSaslClient.



Can I get any pointers on this one ?



LOGS



rg.apache.kafka.common.errors.SaslAuthenticationException: An error:
(java.security.PrivilegedActionException:
javax.security.sasl.SaslException: ELY05176: Unsupported callback [Caused
by javax.security.auth.callback.UnsupportedCallbackException]) occurred
when evaluating SASL token received from the Kafka Broker. Kafka Client
will go to AUTHENTICATION_FAILED state.

Caused by: javax.security.sasl.SaslException: ELY05176: Unsupported
callback [Caused by
javax.security.auth.callback.UnsupportedCallbackException]

                at
org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.mechanism.oauth2.OAuth2Client.getInitialResponse(OAuth2Client.java:58)

                at
org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.sasl.oauth2.OAuth2SaslClient.evaluateMessage(OAuth2SaslClient.java:62)

                at
org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.sasl.util.AbstractSaslParticipant.evaluateMessage(AbstractSaslParticipant.java:219)

                at
org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.sasl.util.AbstractSaslClient.evaluateChallenge(AbstractSaslClient.java:98)

                at
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslToken$1(SaslClientAuthenticator.java:534)

                at
java.base/java.security.AccessController.doPrivileged(Native Method)

                at
java.base/javax.security.auth.Subject.doAs(Subject.java:423)

                at
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:534)

                at
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:433)

                at
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendInitialToken(SaslClientAuthenticator.java:332)

                at
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:273)

                at
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:181)

                at
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:543)

                at
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.common.network.Selector.poll(Selector.java:481)

                at
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)

                at
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)

                at
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)

                at
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)

                at
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245)

                at
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:480)

                at
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261)

                at
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)

                at
org.apache.kafka.clients@1.1.8.1//org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)

                at
com.lgc.common.core//com.lgc.dsl.notifications.consumer.DataChangeNoticeKafkaConsumer.poll(DataChangeNoticeKafkaConsumer.java:388)

                at
com.lgc.common.core//com.lgc.dsds.notifications.producer.DataChangeNotificationProducer.consumeNotification(DataChangeNotificationProducer.java:204)

                at
com.lgc.common.core//com.lgc.dsds.notifications.producer.DataChangeNotificationProducer.retrieveAndProcessNotificationObject(DataChangeNotificationProducer.java:106)

                at
com.lgc.common.core//com.lgc.dsds.notifications.producer.DataChangeNotificationProducer.run(DataChangeNotificationProducer.java:75)

                at java.base/java.lang.Thread.run(Thread.java:834)

Caused by:
javax.security.auth.callback.UnsupportedCallbackException


                at
com.lgc.common.koauth//com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler.handle(AuthOBearerSaslClientCallbackHandler.java:91)

                at
org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.mechanism._private.MechanismUtil.handleCallbacks(MechanismUtil.java:156)

                at
org.wildfly.security.elytron-private@1.11.4.Final//org.wildfly.security.mechanism.oauth2.OAuth2Client.getInitialResponse(OAuth2Client.java:56)

                ... 27 more





*LOGS WHERE THE SSL HANDSHAKE IS SUCCESSFUL ,  THE CLIENT CALL BACK IS
CLIENTCREDENTIAL INSTEAD OF OAuthBearerTokenCallback. The saslclient is
getting set as AbstractSaslClient instead of OAuthBearerSaslClient*



21-08-29 16:21:25,756 DEBUG [io.undertow.request] (management I/O-1)
Upgrading request HttpServerExchange{ GET /}

2021-08-29 16:21:25,760 DEBUG
[org.apache.kafka.common.network.SslTransportLayer]
(OWNotificationProducer) [SslTransportLayer channelId=-1
key=channel=java.nio.channels.SocketChannel[connection-pending
remote=i-10-134-194-96/10.134.194.96:9093],
selector=sun.nio.ch.EPollSelectorImpl@50326a63, interestOps=8, readyOps=0]
SSL handshake completed successfully with peerHost 'i-10-134-194-96'
peerPort 9093 peerPrincipal 'CN=i-10-134-194-96, OU=Foo, O=acme corp,
L=Duckburg, ST=Duckburg, C=WD' cipherSuite
'TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384'

2021-08-29 16:21:25,765 DEBUG
[org.apache.kafka.common.security.authenticator.SaslClientAuthenticator]
(OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4,
groupId=OpenWorksConsumer] Set SASL client state to
RECEIVE_APIVERSIONS_RESPONSE

2021-08-29 16:21:25,766 DEBUG
[org.apache.kafka.common.security.authenticator.SaslClientAuthenticator]
(OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4,
groupId=OpenWorksConsumer] Set SASL client state to SEND_HANDSHAKE_REQUEST

2021-08-29 16:21:25,767 DEBUG [org.apache.kafka.clients.NetworkClient]
(kafka-producer-network-thread | CommonKafkaProducer) [Producer
clientId=CommonKafkaProducer] Give up sending metadata request since no
node is available

2021-08-29 16:21:25,767 DEBUG
[org.apache.kafka.common.security.authenticator.SaslClientAuthenticator]
(OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4,
groupId=OpenWorksConsumer] Set SASL client state to
RECEIVE_HANDSHAKE_RESPONSE

2021-08-29 16:21:25,768 DEBUG
[org.apache.kafka.common.security.authenticator.SaslClientAuthenticator]
(OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4,
groupId=OpenWorksConsumer] Set SASL client state to INITIAL

2021-08-29 16:21:25,769 INFO
[com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler]
(OWNotificationProducer) The class loaders are as follows ************
Callbackclienthandler class
com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler
ModuleClassLoader for Module "com.lgc.common.koauth" from local module
loader @6253c26 (finder: local module finder @49049a04 (roots:
/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/dv,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/base,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/add-ons/keycloak))

2021-08-29 16:21:25,778 INFO
[com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler]
(OWNotificationProducer) The class loaders are as follows ************
OAuthBearerTokenCallback class
org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback
ModuleClassLoader for Module "org.apache.kafka.clients" version 1.1.8.1
from local module loader @6253c26 (finder: local module finder @49049a04
(roots:
/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/dv,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/base,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/add-ons/keycloak))

2021-08-29 16:21:25,787 INFO
[com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler]
(OWNotificationProducer) The class loaders are as follows ************
OAuthBearerLoginModule class
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
ModuleClassLoader for Module "org.apache.kafka.clients" version 1.1.8.1
from local module loader @6253c26 (finder: local module finder @49049a04
(roots:
/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/dv,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/base,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/add-ons/keycloak))

2021-08-29 16:21:25,800 DEBUG
[org.jboss.jca.core.connectionmanager.pool.validator.ConnectionValidator]
(ConnectionValidator) Notifying pools, interval: 500

2021-08-29 16:21:25,800 DEBUG
[org.jboss.jca.core.connectionmanager.pool.strategy.PoolBySubject]
(ConnectionValidator) Checking for connection within frequency

2021-08-29 16:21:25,796 INFO
[com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler]
(OWNotificationProducer) ******************* The callbacls are
org.wildfly.security.auth.callback.CredentialCallback@2b29cf23 ClassName
class
org.wildfly.security.auth.callback.CredentialCallbackModuleClassLoader for
Module "org.wildfly.security.elytron-private" version 1.11.4.Final from
local module loader @6253c26 (finder: local module finder @49049a04 (roots:
/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/dv,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/layers/base,/opt/Landmark/DSIntegrationServer10ep.5.0/ApplicationServer/modules/system/add-ons/keycloak))

2021-08-29 16:21:25,803 DEBUG
[org.jboss.jca.core.connectionmanager.pool.strategy.PoolBySubject]
(ConnectionValidator) Returning for connection within frequency

2021-08-29 16:21:25,803 DEBUG
[org.jboss.jca.core.connectionmanager.pool.strategy.PoolBySubject]
(ConnectionValidator) Checking for connection within frequency

2021-08-29 16:21:25,805 INFO  [org.apache.kafka.common.network.Selector]
(OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4,
groupId=OpenWorksConsumer] Failed authentication with i-10-134-194-96/
10.134.194.96 (An error: (java.security.PrivilegedActionException:
javax.security.sasl.SaslException: ELY05176: Unsupported callback [Caused
by javax.security.auth.callback.UnsupportedCallbackException]) occurred
when evaluating SASL token received from the Kafka Broker. Kafka Client
will go to AUTHENTICATION_FAILED state.)

2021-08-29 16:21:25,809 DEBUG [org.apache.kafka.clients.NetworkClient]
(OWNotificationProducer) [Consumer clientId=consumer-OpenWorksConsumer-4,
groupId=OpenWorksConsumer] Node -1 disconnected.





*Config *



2021-08-29 16:21:25,119 INFO
[org.apache.kafka.clients.consumer.ConsumerConfig] (OWNotificationProducer)
ConsumerConfig values:

                allow.auto.create.topics = true

                auto.commit.interval.ms = 5000

                auto.offset.reset = latest

                bootstrap.servers = [i-10-134-194-96:9093]

                check.crcs = true

                client.dns.lookup = use_all_dns_ips

                client.id = consumer-OpenWorksConsumer-2

                client.rack =

                connections.max.idle.ms = 540000

                default.api.timeout.ms = 60000

                enable.auto.commit = false

                exclude.internal.topics = true

                fetch.max.bytes = 52428800

                fetch.max.wait.ms = 500

                fetch.min.bytes = 1

                group.id = OpenWorksConsumer

                group.instance.id = null

                heartbeat.interval.ms = 3000

                interceptor.classes = []

                internal.leave.group.on.close = true

                internal.throw.on.fetch.stable.offset.unsupported = false

                isolation.level = read_uncommitted

                key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer

                max.partition.fetch.bytes = 1048576

                max.poll.interval.ms = 330000

                max.poll.records = 100

                metadata.max.age.ms = 300000

                metric.reporters = []

                metrics.num.samples = 2

                metrics.recording.level = INFO

                metrics.sample.window.ms = 30000

                partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]

                receive.buffer.bytes = 65536

                reconnect.backoff.max.ms = 1000

                reconnect.backoff.ms = 50

                request.timeout.ms = 30000

                retry.backoff.ms = 100

                sasl.client.callback.handler.class = class
com.oauth2.security.oauthbearer.AuthOBearerSaslClientCallbackHandler

                sasl.jaas.config = [hidden]

                sasl.kerberos.kinit.cmd = /usr/bin/kinit

                sasl.kerberos.min.time.before.relogin = 60000

                sasl.kerberos.service.name = null

                sasl.kerberos.ticket.renew.jitter = 0.05

                sasl.kerberos.ticket.renew.window.factor = 0.8

                sasl.login.callback.handler.class = class
com.oauth2.security.oauthbearer.OAuthAuthenticateLoginCallbackHandler

                sasl.login.class = null

                sasl.login.refresh.buffer.seconds = 300

                sasl.login.refresh.min.period.seconds = 60

                sasl.login.refresh.window.factor = 0.8

                sasl.login.refresh.window.jitter = 0.05

                sasl.mechanism = OAUTHBEARER

                security.protocol = SASL_SSL

                security.providers = null

                send.buffer.bytes = 131072

                session.timeout.ms = 60000

                socket.connection.setup.timeout.max.ms = 30000

                socket.connection.setup.timeout.ms = 10000

                ssl.cipher.suites = null

                ssl.enabled.protocols = [TLSv1.2]

                ssl.endpoint.identification.algorithm =

                ssl.engine.factory.class = null

                ssl.key.password = null

                ssl.keymanager.algorithm = SunX509

                ssl.keystore.certificate.chain = null

                ssl.keystore.key = null

                ssl.keystore.location = null

                ssl.keystore.password = null

                ssl.keystore.type = JKS

                ssl.protocol = TLSv1.2

                ssl.provider = null

                ssl.secure.random.implementation = null

                ssl.trustmanager.algorithm = PKIX

                ssl.truststore.certificates = null

                ssl.truststore.location =
/opt/Landmark/new_certs/securityserver.keystore

                ssl.truststore.password = null

                ssl.truststore.type = JKS

                value.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer



Regards,

Shankar

Reply via email to