RivenSun created KAFKA-13422:
--------------------------------

             Summary: Even if the correct username and password are configured, 
when ClientBroker or KafkaClient tries to establish a SASL connection to 
ServerBroker, an exception is thrown: (Authentication failed: Invalid username 
or password)
                 Key: KAFKA-13422
                 URL: https://issues.apache.org/jira/browse/KAFKA-13422
             Project: Kafka
          Issue Type: Bug
          Components: clients, core
    Affects Versions: 3.0.0, 2.7.1
            Reporter: RivenSun
         Attachments: CustomerAuthCallbackHandler.java, 
LoginContext_login_debug.png, SaslClientCallbackHandler_handle_debug.png

 
h1. Foreword:

When deploying a Kafka cluster with a higher version (2.7.1), I encountered an 
exception of communication identity authentication failure between brokers. In 
the current latest version 3.0.0, this problem can also be reproduced.
h1. Problem recurring:
h2. 1)broker Version is 3.0.0
h3. The content of kafka_server_jaas.conf of each broker is exactly the same, 
the content is as follows:

 

{{KafkaServer \{

  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
  user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
  user_alice="alice";

  org.apache.kafka.common.security.scram.ScramLoginModule required
  username="admin_scram"
  password="admin_scram_password";

 
};}}

 
h3. broker server.properties:

One of the broker configuration files is provided, and the content of the 
configuration files of other brokers is only different from the localPublicIp 
of advertised.listeners.

 

{{broker.id=1
broker.rack=us-east-1a
advertised.listeners=SASL_PLAINTEXT://localPublicIp:9779,SASL_SSL://localPublicIp:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://localPublicIp:9669
log.dirs=/asyncmq/kafka/data_1,/asyncmq/kafka/data_2
zookeeper.connect=***
listeners=SASL_PLAINTEXT://:9779,SASL_SSL://:9889,INTERNAL_SSL://:9009,PLAIN_PLUGIN_SSL://:9669
listener.security.protocol.map=INTERNAL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,PLAIN_PLUGIN_SSL:SASL_SSL
listener.name.plain_plugin_ssl.plain.sasl.server.callback.handler.class=org.apache.kafka.common.security.plain.internals.PlainServerCallbackHandler

#ssl config
ssl.keystore.password=***
ssl.key.password=***
ssl.truststore.password=***
ssl.keystore.location=***
ssl.truststore.location=***
ssl.client.auth=none
ssl.endpoint.identification.algorithm=

#broker communicate config
#security.inter.broker.protocol=SASL_PLAINTEXT
inter.broker.listener.name=INTERNAL_SSL
sasl.mechanism.inter.broker.protocol=PLAIN

#sasl authentication config
sasl.kerberos.service.name=kafka
sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI
delegation.token.master.key=***
delegation.token.expiry.time.ms=86400000
delegation.token.max.lifetime.ms=3153600000000}}

 

Then start all brokers at the same time. Each broker has actually been started 
successfully, but when establishing a connection between the controller node 
and all brokers, the identity authentication has always failed. The connection 
between brokers cannot be established normally, causing the entire Kafka 
cluster to be unable to provide external services.
h3. The server log keeps printing abnormally like crazy:

The real ip sensitive information of the broker in the log, I use ****** 
instead of here

 

{{[2021-10-29 14:16:19,831] INFO [SocketServer listenerType=ZK_BROKER, 
nodeId=3] Started socket server acceptors and processors 
(kafka.network.SocketServer)
[2021-10-29 14:16:19,836] INFO Kafka version: 3.0.0 
(org.apache.kafka.common.utils.AppInfoParser)
[2021-10-29 14:16:19,836] INFO Kafka commitId: 8cb0a5e9d3441962 
(org.apache.kafka.common.utils.AppInfoParser)
[2021-10-29 14:16:19,836] INFO Kafka startTimeMs: 1635516979831 
(org.apache.kafka.common.utils.AppInfoParser)
[2021-10-29 14:16:19,837] INFO [KafkaServer id=3] started 
(kafka.server.KafkaServer)

[2021-10-29 14:16:20,249] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] 
Failed authentication with /****** (Authentication failed: Invalid username or 
password) (org.apache.kafka.common.network.Selector)
[2021-10-29 14:16:20,680] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] 
Failed authentication with /****** (Authentication failed: Invalid username or 
password) (org.apache.kafka.common.network.Selector)
[2021-10-29 14:16:21,109] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] 
Failed authentication with /****** (Authentication failed: Invalid username or 
password) (org.apache.kafka.common.network.Selector)}}

 
h2. 2)Try to change the password of the PlainLoginModule communication between 
brokers in kafka_server_jaas.conf

change is kJTVDziatPgjXG82sFHc4O1EIuewmlvS --> DziatPgjXG82sFHc4O1EIuewmlvS

 

{{KafkaServer \{

  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="DziatPgjXG82sFHc4O1EIuewmlvS"
  user_admin="DziatPgjXG82sFHc4O1EIuewmlvS"
  user_alice="alice";

  org.apache.kafka.common.security.scram.ScramLoginModule required
  username="admin_scram"
  password="admin_scram_password";

 
};}}

 

Restart all broker machines and find that connections can be established 
normally between the controller and all brokers, which can be verified by the 
netstat -anp | grep 9009 command

Real ip sensitive information, I use ****** instead

 

{{[root@ip-10-30-0-64 kafka]# netstat -anp | grep 9009
tcp6       0      0 :::9009                 :::*                    LISTEN      
24852/java          
tcp6       0      0 ******:47502        ******:9009         ESTABLISHED 
24852/java          
tcp6       0      0 ******:9009         ******:41164        ESTABLISHED 
24852/java          
tcp6       0      0 ******:9009         ******:41168        ESTABLISHED 
24852/java }}

The entire cluster can provide external services, which can be verified by 
creating a topic through the script bin/kafka-topics.sh.
h1. Preliminary guess:

1)Does the admin password kJTVDziatPgjXG82sFHc4O1EIuewmlvS contain special 
characters not allowed by Kafka password?

2)Whether the content of the kafka_server_jaas.conf file does not conform to 
the standard format of the Kafka official website?

3)Whether the end newline character of each line in kafka_server_jaas.conf does 
not conform to the newline character of the Linux system?

After consulting data and analysis, the above conjectures are not correct

1)kJTVDziatPgjXG82sFHc4O1EIuewmlvS does not contain special characters not 
allowed by Kafka password

2)The content of the kafka_server_jaas.conf file conforms to the standard 
format of Kafka official website, please refer 
to[https://kafka.apache.org/documentation/#security_sasl]

3)Open the kafka_server_jaas.conf file through the vim command, and through the 
:set list command, you can see that the end newline character of each line is 
the standard linux system file newline character

 

So in order to improve the analysis of the reasons, try to think that the log 
keeps outputting Invalid username or password, what is the username&password 
passed by ClientBroker? What is the username&password expected by ServerBroker?

 
h1. sasl.server.callback.handler.class implement

 

Refer to the 
[sasl.server.callback.handler.class|https://kafka.apache.org/documentation/#brokerconfigs_sasl.server.callback.handler.class]
 parameter and write the implementation class of the interface 
AuthenticateCallbackHandler :CustomerAuthCallbackHandler.

In fact, the code implementation of CustomerAuthCallbackHandler is almost 
exactly the same as Kafka's own default implementation class 
PlainServerCallbackHandler, except that the log output is temporarily added to 
the native authenticate method. The complete code is included in the 
attachment"CustomerAuthCallbackHandler.java"
 
 

{{private final static Logger log = 
LoggerFactory.getLogger(CustomerAuthCallbackHandler.class);

......

protected boolean authenticate(String username, char[] password) throws 
IOException \{
        if (username == null)
            return false;
        else {
            String expectedPassword = 
JaasContext.configEntryOption(jaasConfigEntries,
                    JAAS_USER_PREFIX + username,
                    PlainLoginModule.class.getName());
            boolean authenticateSuccess = expectedPassword != null && 
Utils.isEqualConstantTime(password, expectedPassword.toCharArray());
            log.info("CustomerAuthCallbackHandler authenticate [{}] | user [{}] 
password is [{}] , expectedPassword is [{}] ", authenticateSuccess, username, 
new String(password), expectedPassword);
            return authenticateSuccess;
        }
    }}}

 

Each broker's configuration file server.properties adds a new line of 
configuration

 

{{listener.name.internal_ssl.plain.sasl.server.callback.handler.class=us.zoom.mq.security.plain.CustomerAuthCallbackHandler}}

Rollback the password of admin in the content of kafka_server_jaas.conf, the 
content is as follows:

 

{{KafkaServer \{

  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
  user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
  user_alice="alice";

  org.apache.kafka.common.security.scram.ScramLoginModule required
  username="admin_scram"
  password="admin_scram_password";

 
};}}

Restart all brokers, you can observe the log:

The real ip sensitive information of the broker in the log, I use ****** 
instead of here

 

{{[2021-10-29 15:31:09,886] INFO [SocketServer listenerType=ZK_BROKER, 
nodeId=3] Started data-plane acceptor and processor(s) for endpoint : 
ListenerName(SASL_PLAINTEXT) (kafka.network.SocketServer)
[2021-10-29 15:31:09,925] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] 
Started data-plane acceptor and processor(s) for endpoint : 
ListenerName(PLAIN_PLUGIN_SSL) (kafka.network.SocketServer)
[2021-10-29 15:31:09,926] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] 
Started socket server acceptors and processors (kafka.network.SocketServer)
[2021-10-29 15:31:09,932] INFO Kafka version: 3.0.0 
(org.apache.kafka.common.utils.AppInfoParser)
[2021-10-29 15:31:09,932] INFO Kafka commitId: 8cb0a5e9d3441962 
(org.apache.kafka.common.utils.AppInfoParser)
[2021-10-29 15:31:09,932] INFO Kafka startTimeMs: 1635521469926 
(org.apache.kafka.common.utils.AppInfoParser)
[2021-10-29 15:31:09,933] INFO [KafkaServer id=3] started 
(kafka.server.KafkaServer)
[2021-10-29 15:31:10,305] INFO CustomerAuthCallbackHandler authenticate [false] 
| user [admin] password is [admin_scram_password] , expectedPassword is 
[kJTVDziatPgjXG82sFHc4O1EIuewmlvS]  
(us.zoom.mq.security.plain.CustomerAuthCallbackHandler)
[2021-10-29 15:31:10,306] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] 
Failed authentication with /****** (Authentication failed: Invalid username or 
password) (org.apache.kafka.common.network.Selector)
[2021-10-29 15:31:10,734] INFO CustomerAuthCallbackHandler authenticate [false] 
| user [admin] password is [admin_scram_password] , expectedPassword is 
[kJTVDziatPgjXG82sFHc4O1EIuewmlvS]  
(us.zoom.mq.security.plain.CustomerAuthCallbackHandler)
[2021-10-29 15:31:10,735] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] 
Failed authentication with /****** (Authentication failed: Invalid username or 
password) (org.apache.kafka.common.network.Selector)
[2021-10-29 15:31:11,165] INFO CustomerAuthCallbackHandler authenticate [false] 
| user [admin] password is [admin_scram_password] , expectedPassword is 
[kJTVDziatPgjXG82sFHc4O1EIuewmlvS]  
(us.zoom.mq.security.plain.CustomerAuthCallbackHandler)
[2021-10-29 15:31:11,165] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] 
Failed authentication with /****** (Authentication failed: Invalid username or 
password) (org.apache.kafka.common.network.Selector)
[2021-10-29 15:31:11,596] INFO CustomerAuthCallbackHandler authenticate [false] 
| user [admin] password is [admin_scram_password] , expectedPassword is 
[kJTVDziatPgjXG82sFHc4O1EIuewmlvS]  
(us.zoom.mq.security.plain.CustomerAuthCallbackHandler)}}

 

Through the log, we clearly know that the reason for Authentication failed is 
that the username passed by ClientBroker is correct when the connection between 
brokers is established, but the password and expectedPassword do not match, and 
the value of password is configured in the ScramLoginModule in the 
kafka_server_jaas.conf file password.

At this point of analysis, we can only initially understand that the password 
value passed by ClientBroker is wrong, but we still don't know the reason for 
the wrong password. We can only continue to analyze RC by reading the source 
code of the Kafka server startup process.

 
h1. Kafka server Startup process source code analysis

The startup entry is in the core project, the main method of the scala class 
kafka.Kafka

KafkaServer's startup(), this method completes the initialization of many key 
modules, such as logManager, socketServer, kafkaController, tokenManager, 
groupCoordinator and other modules

In the startup() method of KafkaServer, and the key pieces of code for this 
problem as follows:

1)socketServer.startup(startProcessingRequests = false)

This method completes the creation of Acceptor and Processors for ControlPlane 
and DataPlane

 

2)socketServer.startProcessingRequests(authorizerFutures)

This method completes the start of Acceptor and Processors for ControlPlane and 
DataPlane

 

3)Further trace to the startAcceptorAndProcessors method in SocketServer

This method will start all Processors threads for each listener.name

See method startProcessors(processors: Seq[Processor], processorThreadPrefix: 
String)

 

4)Analyze configureNewConnections() in the run method of the Processor thread 
class

There is a line of key code in this 
methodselector.register(connectionId(channel.socket), channel)

Continue to analyze the code at the bottom

registerChannel(id, socketChannel, SelectionKey.OP_READ) -->

KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key) -->

SaslChannelBuilder.buildChannel(...)

 

5)The method buildChannel(...) of SaslChannelBuilder

In this method, the type of Mode determines whether it is 
buildServerAuthenticator or buildClientAuthenticator

The most critical difference between the two methods is that the former is 
createSaslServer and the latter is createSaslClient.

SaslServer is responsible for ServerBroker verifying user passwords, and the 
logic of PlainLoginModule verifying passwords can refer to the default 
implementation class PlainServerCallbackHandler

SaslClient is responsible for obtaining user passwords on KafkaClient

So our focus should be buildClientAuthenticator, so the value of the instance 
variable Mode mode of SaslChannelBuilder should be CLIENT.

So we can go back and trace the construction method :

SaslChannelBuilder public SaslChannelBuilder(...)


6)the construction method public SaslChannelBuilder(...)

This method is only called in the private static ChannelBuilder create(...) 
method of ChannelBuilders,

Continue to trace up to static ChannelBuilder clientChannelBuilder(...)

In fact, traced here, you can find that the caller of the 
clientChannelBuilder(...) method is either ClientBroker or ClientUtils class. 
The latter is for KafkaProducer/KafkaConsumer/KafkaAdminClient

So in order to further analyze the root cause, I decided to use KafkaProducer 
to simulate ClientBroker request connection. Their underlying mechanisms are 
almost the same, except that some configuration items are only supported by 
ClientBroker or the section of static JAAS configuration is different.
h1. Source code analysis of KafkaProducer's Sasl authentication process

 

ClientUtils.clientChannelBuilder(…) → ChannelBuilder create(...) → 
SaslChannelBuilder#void configure(Map<String, ?> configs) 

The key operations completed in the above methods

1)JaasContext load

If the kafka_Client_jaas.conf file is specified through the 
java.security.auth.login.config environment variable, the method of loading 
into JaasContext is the method getAppConfigurationEntry(String var1) in 
sun.security.provider.ConfigFile

 

{{ public AppConfigurationEntry[] getAppConfigurationEntry(String var1) \{
        return this.spi.engineGetAppConfigurationEntry(var1);
    }}}

 

2)createClientCallbackHandler(Map<String, ?> configs)

The default ClientCallbackHandler implementation class is 
SaslClientCallbackHandler. The role of SaslClientCallbackHandler is that 
KafkaClient takes the username and password from the saved authentication 
credentials in Subject.java in the LoginManager of its own Channel. *This is 
particularly critical, and we will analyze it in detail later.*

3)Initialization of LoginManager and loading of LoginContext

Here we directly trace the construction method of LoginManager

private LoginManager(...) → AbstractLogin#login()

Then there are two key operations in the AbstractLogin#login() method

 

 

{{@Override
    public LoginContext login() throws LoginException \{
        loginContext = new LoginContext(contextName, null, 
loginCallbackHandler, configuration);
        loginContext.login();
        log.info("Successfully logged in.");
        return loginContext;
    }}}

The role of new LoginContext(...) is

(1)Get the configuration variables in JaasContext, where all the credentials of 
kafka_Client_jaas.conf are stored

(2)Complete the initialization of the instance variable moduleStack array

 

loginContext.login() → invokePriv(LOGIN_METHOD) → invoke(String methodName)

(1)In these methods
Updated the Object module field of each element of the moduleStack array

(2) Obtain all public methods of each type of LoginModule class through 
reflection

 

 

{{methods = moduleStack[i].module.getClass().getMethods();}}

And get the index of the initialize method in the methods array through the 
INIT_METHOD constant

Then execute the initialize method of each type of LoginModule class through 
reflection

 

{{  methods[mIndex].invoke(moduleStack[i].module, initArgs);}}

This method is very important. In this method invokePriv(LOGIN_METHOD), by 
looping through the moduleStack variables, LoginContext will execute all the 
initialize methods of the LoginModule class you configured, and the initialize 
methods of PlainLoginModule and ScramLoginModule will load the current 
username&password configured in kafka_Client_jaas.conf In the Subject of the 
LoginManager corresponding to the Channel. The data structure of the two fields 
storing username and password in Subject are both SecureSet, and in SecureSet, 
LinkedList is used to store the corresponding elements. *So the key point: the 
order of the elements in the two Credentials in the Subject must be added in 
the order of all LoginModules in kafka_Client_jaas.conf*

4)Let's go back and analyze the SaslClientCallbackHandler mentioned in the 
second step above

Where is this class used? In fact, KafkaClient is ready to initiate the 
operation of establishing a connection. ClientFactoryImpl#createSaslClient(...) 
will use SaslClientCallbackHandler.

The stack of the method is:

NetworkClient#initiateConnect(Node node, long now)

→

Selector#connect(String id, InetSocketAddress address, int sendBufferSize, int 
receiveBufferSize)

-->

Selector#registerChannel(String id, SocketChannel socketChannel, int 
interestedOps)

-->

Selector#buildAndAttachKafkaChannel(SocketChannel socketChannel, String id, 
SelectionKey key)

-->

SaslChannelBuilder#buildChannel(...) –> 
SaslChannelBuilder#buildClientAuthenticator

-->

SaslClientAuthenticator#SaslClient createSaslClient()

-->

Sasl.createSaslClient(...)

-->

ClientFactoryImpl 的createSaslClient(...) . The source code of this method is as 
follows:

 

{{public SaslClient createSaslClient(String[] var1, String var2, String var3, 
String var4, Map<String, ?> var5, CallbackHandler var6) throws SaslException \{
    for(int var7 = 0; var7 < var1.length; ++var7) {
        if (var1[var7].equals(myMechs[0]) && 
PolicyUtils.checkPolicy(mechPolicies[0], var5)) {
            return new ExternalClient(var2);
        }

        Object[] var8;
        if (var1[var7].equals(myMechs[1]) && 
PolicyUtils.checkPolicy(mechPolicies[1], var5)) \{
            var8 = this.getUserInfo("CRAM-MD5", var2, var6);
            return new CramMD5Client((String)var8[0], 
(byte[])((byte[])var8[1]));
        }

        if (var1[var7].equals(myMechs[2]) && 
PolicyUtils.checkPolicy(mechPolicies[2], var5)) \{
            var8 = this.getUserInfo("PLAIN", var2, var6);
            return new PlainClient(var2, (String)var8[0], 
(byte[])((byte[])var8[1]));
        }
    }

    return null;
}}}

You can see the CallbackHandler passed in from the upper layer, which is the 
parameter var6. In fact, it is Kafka's own SaslClientCallbackHandler. Then 
continue to look at the source code of getUserInfo. You can clearly see that 
NameCallback and PasswordCallback are constructed, and the handle method of 
SaslClientCallbackHandler is executed: var3.handle(new Callback[]\{var6, var7});

 

{{private Object[] getUserInfo(String var1, String var2, CallbackHandler var3) 
throws SaslException \{
        if (var3 == null) {
            throw new SaslException("Callback handler to get username/password 
required");
        } else \{
            try {
                String var4 = var1 + " authentication id: ";
                String var5 = var1 + " password: ";
                NameCallback var6 = var2 == null ? new NameCallback(var4) : new 
NameCallback(var4, var2);
                PasswordCallback var7 = new PasswordCallback(var5, false);
                var3.handle(new Callback[]{var6, var7});
                char[] var8 = var7.getPassword();
                byte[] var9;
                if (var8 != null) \{
                    var9 = (new String(var8)).getBytes("UTF8");
                    var7.clearPassword();
                } else \{
                    var9 = null;
                }

                String var10 = var6.getName();
                return new Object[]\{var10, var9};
            } catch (IOException var11) \{
                throw new SaslException("Cannot get password", var11);
            } catch (UnsupportedCallbackException var12) \{
                throw new SaslException("Cannot get userid/password", var12);
            }
        }
    }}}

 

The most important role of SaslClientCallbackHandler is its handle(Callback[] 
callbacks) method, which takes out username and password from Subject

By analyzing the source code analysis, when Subject takes elements from each 
type of Credentials, it will change the data structure of the Credentials 
SecureSet into a HashSet, and then call HashSet<String>.iterator().next() to 
get the first itemof each type of HashSet Elements, as username and password in 
the corresponding Callback

 

{{ for (Callback callback : callbacks) \{
            if (callback instanceof NameCallback) {
                NameCallback nc = (NameCallback) callback;
                if (subject != null && 
!subject.getPublicCredentials(String.class).isEmpty()) {
                    
nc.setName(subject.getPublicCredentials(String.class).iterator().next());
                } else
                    nc.setName(nc.getDefaultName());
            } else if (callback instanceof PasswordCallback) \{
                if (subject != null && 
!subject.getPrivateCredentials(String.class).isEmpty()) {
                    char[] password = 
subject.getPrivateCredentials(String.class).iterator().next().toCharArray();
                    ((PasswordCallback) callback).setPassword(password);
                } else \{
                    String errorMessage = "Could not login: the client is being 
asked for a password, but the Kafka" +
                             " client code does not currently support obtaining 
a password from the user.";
                    throw new UnsupportedCallbackException(callback, 
errorMessage);
                }
            }
            
      .......
      }}}

*The key point is: when KafkaClient takes the username and password from the* 
Subject*, it must be the first element in the* HashSet<String> *of the content 
of each* Credentials *element, and the index order of the elements in the 
HashSet<String> depends on the element's* hash( ) *value.*
h1. KafkaProducer Sasl identity authentication process Debug

 
h2. Precondition:
h3. 1)kafka_server_jaas.conf Configuration:

All the broker machines java.security.auth.login.config use this configuration, 
the broker starts normally, the Kafka cluster is healthy, and can provide 
services to the outside world

 

{{KafkaServer \{

  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
  user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
  user_alice="alice";

  org.apache.kafka.common.security.scram.ScramLoginModule required
  username="admin_scram"
  password="admin_scram_password";

 
};}}

 
h3. 2)kafkaProducer key Configuration

 

{{props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "******:9669"); 
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
System.setProperty("java.security.auth.login.config","******\\kafka_Client_jaas.conf");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, 
"******\\client.truststore.jks");
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "******");
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");

KafkaProducer<String, String> producer = new KafkaProducer<String, 
String>(props);}}

 
h3. 3)kafka_Client_jaas.conf File,Simulate the JAAS configuration of 
ClientBroker

 

{{KafkaClient \{

  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
  user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
  user_alice="alice";

  org.apache.kafka.common.security.scram.ScramLoginModule required
  username="admin_scram"
  password="DziatPgjXG82sFHc4O1EIuewmlvS";

 
};}}

 
h3. Start the Debug for authentication process
h4. 1)LoginSucceeded = true in the login() method of LoginContext; place 
breakpoint in this line code

Code debug picture see LoginContext_login_debug
 
We can see: *The order of the elements in the two Credentials in the* Subject 
*must be added in the order of all LoginModules in kafka_Client_jaas.conf*

 
h4. 2)Place breakpoint debugging to handle(Callback[] callbacks) of 
SaslClientCallbackHandler


Code debug picture see SaslClientCallbackHandler_handle_debug
 
You can see that the password field character array of PasswordCallback here 
starts with “DziatPgjXG”, which corresponds to the password of ScramLoginModule 
in the kafka_Client_jaas.conf file: "DziatPgjXG82sFHc4O1EIuewmlvS"

 
h4. 3)Cancel all breakpoints and run the KafkaProducer program

You can see the Producer log

The real ip sensitive information of the broker in the log, I use ****** 
instead of here

 

{{[main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - 
Successfully logged in.
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 
8cb0a5e9d3441962
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 
1635534803428
[kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] 
Bootstrap broker ******:9669 (id: -3 rack: null) disconnected
[kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] 
Bootstrap broker ******:9669 (id: -1 rack: null) disconnected
[kafka-producer-network-thread | producer-1] INFO 
org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] 
Failed authentication with ******/****** (Authentication failed: Invalid 
username or password)
[kafka-producer-network-thread | producer-1] ERROR 
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] 
Connection to node -2 (******/******:9669) failed authentication due to: 
Authentication failed: Invalid username or password
[kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] 
Bootstrap broker ******:9669 (id: -2 rack: null) disconnected
[main] ERROR ProducerTest - the producer has a error:Authentication failed: 
Invalid username or password
[kafka-producer-network-thread | producer-1] INFO 
org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] 
Failed authentication with ******/****** (Authentication failed: Invalid 
username or password)
[kafka-producer-network-thread | producer-1] ERROR 
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] 
Connection to node -3 (******/******:9669) failed authentication due to: 
Authentication failed: Invalid username or password
[kafka-producer-network-thread | producer-1] WARN 
org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] 
Bootstrap broker ******:9669 (id: -3 rack: null) disconnected
[main] ERROR ProducerTest - the producer has a error:Authentication failed: 
Invalid username or password}}

You can see that the Producer log is also printing exceptions: Authentication 
failed: Invalid username or password

Because the password of the PlainLoginModule expected by ServerBroker should be 
“kJTVDziatPgjXG82sFHc4O1EIuewmlvS”, but the password on the KafkaProducer side 
was wrong, it was taken as the password in the ScramLoginModule: 
“DziatPgjXG82sFHc4O1EIuewmlvS”

 

 
h1. Root Cause:
h2. 1. The Credentials stored in the Subject contain the username&password of 
all LoginModules in kafka_Client_jaas.conf


The JDK's LoginContext class initialization code is as follows

 

{{public LoginContext(String name, Subject subject,
                        CallbackHandler callbackHandler,
                        Configuration config) throws LoginException \{
        this.config = config;
        if (config != null) {
            creatorAcc = java.security.AccessController.getContext();
        }

        init(name);
       ....
       }}}

When JAAS configuration is configured through java.security.auth.login.config, 
the config type here is actually the sun.security.provider.ConfigFile type of 
the JDK .

Then when the above init(name); code is executed, 
config.getAppConfigurationEntry(name) will read all the LoginModules in the 
kafka_Client_jaas.conf file again, and use the entries variable to complete the 
assignment of the instance variable moduleStack in the following code。

 

{{ // get the LoginModules configured for this application
        AppConfigurationEntry[] entries = config.getAppConfigurationEntry(name);
        if (entries == null) \{

            if (sm != null && creatorAcc == null) {
                sm.checkPermission(new AuthPermission
                                ("createLoginContext." + OTHER));
            }

            entries = config.getAppConfigurationEntry(OTHER);
            if (entries == null) \{
                MessageFormat form = new MessageFormat(ResourcesMgr.getString
                        ("No.LoginModules.configured.for.name"));
                Object[] source = {name};
                throw new LoginException(form.format(source));
            }
        }
        moduleStack = new ModuleInfo[entries.length];}}

*Eventually, the* Credentials *stored in the* Subject *will contain the 
username & password of all* LoginModules *in kafka_Client_jaas.conf.*
h2. 2.In the Subject class in JDK, when two kinds of Credentials take elements, 
the order of taking elements may not be the same as the order in which 
Credentials store elements. The order of taking elements depends on the order 
of the hash index of the elements in HashSet.

 
h1. Suggestion & Solutions:

 
h2. 1.Modification of JaasContext and JaasConfig class construction methods
h3. 
1)When KafkaClient initializes Map<String, JaasContext> jaasContexts, the 
construction method of JaasContext needs to pass in the clientSaslMechanism 
configured by KafkaClient

And use clientSaslMechanism to filter the entries returned by 
configuration.getAppConfigurationEntry(name). When clientSaslMechanism = PLAIN, 
the final configurationEntries instance variable should only contain the 
PlainLoginModule content in kafka_Client_jaas.conf;When clientSaslMechanism 
=SCRAM-SHA-256, configurationEntries should only contain the content of 
ScramLoginModule
 *  {{public JaasContext(String name, Type type, Configuration configuration, 
Password dynamicJaasConfig) \{
        this.name = name;
        this.type = type;
        this.configuration = configuration;
        AppConfigurationEntry[] entries = 
configuration.getAppConfigurationEntry(name);
        if (entries == null)
            throw new IllegalArgumentException("Could not find a '" + name + "' 
entry in this JAAS configuration.");
        
        //Add here the code that uses clientSaslMechanism to verify and filter 
entries
        
        this.configurationEntries = Collections.unmodifiableList(new 
ArrayList<>(Arrays.asList(entries)));
        this.dynamicJaasConfig = dynamicJaasConfig;
    }}}

The advantages of this are two points:
(1)For mode == Mode.CLIENT, even if JAAS configuration is configured through 
java.security.auth.login.config, the JaasContext of KafkaClient (including 
KafkaProducer or KafkaConsumer, etc.) can reach the ClientBroker side 
customized configuration 
"[listener.name|http://listener.name/].\{listenerName}.\{saslMechanism}.sasl.jaas.config";
 semantics,See link 
[https://kafka.apache.org/documentation/#security_jaas_broker] . 
To achieve the goal: In Map<String, JaasContext> jaasContexts, the 
configurationEntries in the JaasContext corresponding to each saslMechanism 
only contains the contents of the LoginModule part corresponding to 
saslMechanism.
(2)For mode == Mode.SERVER,
SaslChannelBuilder#configure(Map<String, ?> configs)
 {{public void configure(Map<String, ?> configs) throws KafkaException \{
        try {
            this.configs = configs;
            if (mode == Mode.SERVER) {
                createServerCallbackHandlers(configs);
                createConnectionsMaxReauthMsMap(configs);
            } else
                createClientCallbackHandler(configs);
            for (Map.Entry<String, AuthenticateCallbackHandler> entry : 
saslCallbackHandlers.entrySet()) \{
                String mechanism = entry.getKey();
                entry.getValue().configure(configs, mechanism, 
jaasContexts.get(mechanism).configurationEntries());
            }
            ......
            
            }}}

Because we have completed the verification of configurationEntries in 
jaasContexts.get(mechanism), when executing entry.getValue().configure(...) 
method, in particular, PlainServerCallbackHandler executes configure(...), The 
instance variable jaasConfigEntries in PlainServerCallbackHandler will become 
more pure, jaasConfigEntries will no longer contain the content of other 
LoginModule
h3. 2)JaasConfig Class need to provide a new construction method

In the construction method of JaasContext, the parameter Configuration 
configuration is uniformly converted into JaasConfig configuration. Because of 
the behavior in the init(String name) method of the LoginContext class of the 
JDK, we cannot change it.
 *  {{public JaasConfig(String loginContextName, List<AppConfigurationEntry> 
configurationEntries) \{
this.loginContextName = loginContextName;
if (configurationEntries == null || configurationEntries.size() == 0)
        throw new IllegalArgumentException("JAAS config property does not 
contain any login modules");
this.configEntries = configurationEntries;
}}}

Then in the construction method of JaasContext, use the configurationEntries 
after verification and filtering to restructure the configuration
 {{public JaasContext(String name, Type type, Configuration configuration, 
Password dynamicJaasConfig) \{
        this.name = name;
        this.type = type;
        this.configuration = configuration;
        AppConfigurationEntry[] entries = 
configuration.getAppConfigurationEntry(name);
        if (entries == null)
            throw new IllegalArgumentException("Could not find a '" + name + "' 
entry in this JAAS configuration.");
        
        //Add here the code that uses clientSaslMechanism to verify and filter 
entries
        
        this.configurationEntries = Collections.unmodifiableList(new 
ArrayList<>(Arrays.asList(entries)));
        
        if (configuration instanceof JaasConfig)
            this.configuration = configuration;
        else
            this.configuration = new JaasConfig(name, configurationEntries);
        
        this.dynamicJaasConfig = dynamicJaasConfig;
    }}}

 
h2. 2.Each type of LoginModule in kafka_Client_jaas.conf should only be 
configured once


For example, in the following configuration, JaasContext is loaded, and an 
exception should be thrown when executing the construction method of JaasContext

 

{{KafkaClient \{

  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
  user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
  user_alice="alice";
  
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin2"
  password="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
  user_admin="kJTVDziatPgjXG82sFHc4O1EIuewmlvS"
  user_tom="tom";

  org.apache.kafka.common.security.scram.ScramLoginModule required
  username="admin_scram"
  password="DziatPgjXG82sFHc4O1EIuewmlvS";

 
};}}

 
h2. 3. Discussion for Subject class in JDK

The Subject class in the JDK, when fetching elements, SecureSet converts 
HashSet. I don't know what the JDK considers. We can try to submit an issue to 
the JDK official. When we can fetch elements, the Set type also uses 
SecureSet.,as SecureSet → SecureSet 。

But I recommend making changes on the upper application side, Kafka level.

First of all, whether it is Map<String, JaasContext> jaasContexts or 
Map<String, LoginManager> loginManagers variables, the key is the mechanism. 
Therefore, the value corresponding to each mechanism should be purer and should 
not contain the contents of the LoginModule of other mechanisms. This way, it 
is also avoided that there are elements greater than 1 in each Credentials in 
the Subject.

Secondly, because JDK is designing Subject, Credentials may not pay attention 
to the semantics of "first element". What the JDK promises is to return every 
element in Credentials to you and allow you to change the returned data.

See the comments of the getPrivateCredentials method

 

{{Return a Set of private credentials associated with this Subject that are 
instances or subclasses of the specified Class.
The caller must have permission to access all of the requested Credentials, or 
a SecurityException will be thrown.
The returned Set is not backed by this Subject's internal private Credential 
Set. A new Set is created and returned for each method invocation. 
Modifications to the returned Set will not affect the internal private 
Credential Set.

Params:
c – the returned Set of private credentials will all be instances of this class.
Type parameters:
<T> – the type of the class modeled by c
Returns:
a Set of private credentials that are instances of the specified Class.
Throws:
NullPointerException – if the specified Class is null.
public <T> Set<T> getPrivateCredentials(Class<T> c) }}

However, when Kafka takes elements from the two Credentials in the Subject, it 
uses the semantics of "take the first element", which may conflict with the 
JDK's concept when designing the Subject. Because you should get all the 
elements of each Credentials for identity authentication.

 

{{subject.getPrivateCredentials(String.class).iterator().next()}}

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to