[ 
https://issues.apache.org/jira/browse/KAFKA-13556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

joecqupt resolved KAFKA-13556.
------------------------------
      Assignee: joecqupt
    Resolution: Duplicate

> Kafka cluster loses the controller
> ----------------------------------
>
>                 Key: KAFKA-13556
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13556
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 2.8.1
>            Reporter: Andrei Lakhmanets
>            Assignee: joecqupt
>            Priority: Major
>
> Hi team,
> *Kafka version:* 2.8.1
> *Configuration:* 3 kafka brokers in different availability zones and 3 
> zookeeper brokers in different availability zones.
> I faced with a bug when kafka cluster loses the controller and if after that 
> restart any none controller broker then it stops processing data.
> *Context:*
> The kafka cluster has SASL configuration for connection.
> {code:java}
> # Listeners config
> listeners=SASL_SSL://[::]:9091,SASL_PLAINTEXT://[::]:9092
> advertised.listeners=SASL_SSL://---DOMAIN---:9091,SASL_PLAINTEXT://---DOMAIN---:9092sasl.enabled.mechanisms=SCRAM-SHA-512
> inter.broker.listener.name=SASL_SSL
> sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
> inter.broker.protocol.version=2.8-IV0
> # Keystore config
> ssl.keystore.location=/etc/kafka/ssl/server.keystore.jks
> ssl.keystore.password=---PASSWORD---
> ssl.key.password=---PASSWORD---
> ssl.truststore.location=/etc/kafka/ssl/server.truststore.jks
> ssl.truststore.password=---PASSWORD--- 
> zookeeper.connect=---ZK_HOST_1---:2181,---ZK_HOST_2---:2181,---ZK_HOST_3---:2181
> zookeeper.connection.timeout.ms=6000 {code}
> Zookeeper doesn't have SASL configuration and uses connection without any 
> authentication.
> So, when I start kafka brokers I see in logs error about auth failed but then 
> ZK client switches to connection without authentication and all works fine.
> {code:java}
> [2021-12-16 14:04:08,451] INFO [ZooKeeperClient Kafka server] Waiting until 
> connected. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,462] WARN SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it. 
> (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:04:08,466] DEBUG [ZooKeeperClient Kafka server] Received 
> event: WatchedEvent state:AuthFailed type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,467] ERROR [ZooKeeperClient Kafka server] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,470] INFO Opening socket connection to server 
> ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:04:08,478] INFO Socket connection established, initiating 
> session, client: /===BROKER_2_IP===:48022, server: 
> ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:04:08,496] INFO Session establishment complete on server 
> ===ZOOKEEPER_HOST_1===:2181, sessionid = 0x10028c111930125, negotiated 
> timeout = 18000 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:04:08,497] DEBUG [ZooKeeperClient Kafka server] Received 
> event: WatchedEvent state:SyncConnected type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,498] INFO [ZooKeeperClient Kafka server] Connected. 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:04:08,753] INFO [feature-zk-node-event-process-thread]: 
> Starting 
> (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread)
>  {code}
> Then I restart ZK node (or it restarts somehow) which holds connection with 
> kafka controller node (===ZOOKEEPER_HOST_1===) and in logs I see next:
> {code:java}
> [2021-12-16 14:08:42,951] INFO Unable to read additional data from server 
> sessionid 0x10028c111930126, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:42,951] INFO Unable to read additional data from server 
> sessionid 0x10028c111930125, likely server has closed socket, closing socket 
> connection and attempting reconnect 
> (org.apache.zookeeper.ClientCnxn)[2021-12-16 14:08:43,052] DEBUG 
> [ZooKeeperClient ACL authorizer] Received event: WatchedEvent 
> state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:43,053] DEBUG [ZooKeeperClient Kafka server] Received 
> event: WatchedEvent state:Disconnected type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:43,583] WARN SASL 
> configuration failed: javax.security.auth.login.LoginException: No JAAS 
> configuration section named 'Client' was found in specified JAAS 
> configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue 
> connection to Zookeeper server without SASL authentication, if Zookeeper 
> server allows it. (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:43,583] INFO Opening socket connection to server 
> ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:43,583] DEBUG [ZooKeeperClient ACL authorizer] Received 
> event: WatchedEvent state:AuthFailed type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:43,584] ERROR [ZooKeeperClient ACL authorizer] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:43,593] INFO Socket connection established, initiating 
> session, client: /===BROKER_2_IP===:48560, server: 
> ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:43,808] INFO Unable to read additional data from server 
> sessionid 0x10028c111930126, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:43,910] DEBUG [ZooKeeperClient ACL authorizer] Received 
> event: WatchedEvent state:Disconnected type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,019] WARN SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it. 
> (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,019] INFO Opening socket connection to server 
> ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,019] DEBUG [ZooKeeperClient Kafka server] Received 
> event: WatchedEvent state:AuthFailed type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,019] ERROR [ZooKeeperClient Kafka server] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,022] INFO Socket connection established, initiating 
> session, client: /===BROKER_2_IP===:37734, server: 
> ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,023] INFO Unable to read additional data from server 
> sessionid 0x10028c111930125, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,123] DEBUG [ZooKeeperClient Kafka server] Received 
> event: WatchedEvent state:Disconnected type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,124] INFO 
> [ZooKeeperClient Kafka server] Waiting until connected. 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,125] INFO [ZooKeeperClient Kafka server] Waiting until 
> connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,586] INFO 
> [ZooKeeperClient ACL authorizer] Reinitializing due to auth failure. 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,704] WARN SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it. 
> (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,704] INFO Opening socket connection to server 
> ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,705] DEBUG [ZooKeeperClient ACL authorizer] Received 
> event: WatchedEvent state:AuthFailed type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,705] ERROR [ZooKeeperClient ACL authorizer] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,707] INFO Socket connection established, initiating 
> session, client: /===BROKER_2_IP===:37738, server: 
> ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,709] INFO Session establishment complete on server 
> ===ZOOKEEPER_HOST_2===:2181, sessionid = 0x10028c111930126, negotiated 
> timeout = 18000 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,710] DEBUG [ZooKeeperClient ACL authorizer] Received 
> event: WatchedEvent state:SyncConnected type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,798] WARN SASL configuration failed: 
> javax.security.auth.login.LoginException: No JAAS configuration section named 
> 'Client' was found in specified JAAS configuration file: 
> '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper 
> server without SASL authentication, if Zookeeper server allows it. 
> (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,798] INFO Opening socket connection to server 
> ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,798] DEBUG [ZooKeeperClient Kafka server] Received 
> event: WatchedEvent state:AuthFailed type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,798] ERROR [ZooKeeperClient Kafka server] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,807] INFO Socket connection established, initiating 
> session, client: /===BROKER_2_IP===:48568, server: 
> ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,824] INFO Session establishment complete on server 
> ===ZOOKEEPER_HOST_3===:2181, sessionid = 0x10028c111930125, negotiated 
> timeout = 18000 (org.apache.zookeeper.ClientCnxn)
> [2021-12-16 14:08:44,824] DEBUG [ZooKeeperClient Kafka server] Received 
> event: WatchedEvent state:SyncConnected type:None path:null 
> (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,824] INFO 
> [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
> [2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. 
> (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:45,020] INFO 
> [ZooKeeperClient Kafka server] Reinitializing due to auth failure. 
> (kafka.zookeeper.ZooKeeperClient) {code}
> The main points here are next:
> 1. The log above is from kafka controller node.
> 2. Connection to ZK was lost _"Received event: WatchedEvent 
> state:Disconnected"_
> 3. Connection was established with live ZK nodes _"Opening socket connection 
> to server ===ZOOKEEPER_HOST_3===:2181"_ and _"Received event: WatchedEvent 
> state:SyncConnected"_
> 4. During connection process we get _"Auth failed."_ error as it was during 
> starting of brokers.
> 5. The error "Auth failed" is catched in the code: 
> *kafka.zookeeper.ZooKeeperClient.ZooKeeperClientWatcher.process* and call 
> code:
> {code:java}
> if (initialized)
>     scheduleReinitialize("auth-failed", "Reinitializing due to auth 
> failure.", RetryBackoffMs) {code}
> where RetryBackoffMs is 1000.
> In a second after "Auth failed" message we see message "{_}Reinitializing due 
> to auth failure.{_}" in the log.
> The method *ZooKeeperClient.scheduleReinitialize* calls 
> {*}ZooKeeperClient.reinitialize{*}.
>  
> {code:java}
>   private def reinitialize(): Unit = {
>     // Initialization callbacks are invoked outside of the lock to avoid 
> deadlock potential since their completion
>     // may require additional Zookeeper requests, which will block to acquire 
> the initialization lock
>     stateChangeHandlers.values.foreach(callBeforeInitializingSession _)    
> inWriteLock(initializationLock) {
>       if (!connectionState.isAlive) {
>         zooKeeper.close()
>         info(s"Initializing a new session to $connectString.")
>         // retry forever until ZooKeeper can be instantiated
>         var connected = false
>         while (!connected) {
>           try {
>             zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
> ZooKeeperClientWatcher, clientConfig)
>             connected = true
>           } catch {
>             case e: Exception =>
>               info("Error when recreating ZooKeeper, retrying after a short 
> sleep", e)
>               Thread.sleep(RetryBackoffMs)
>           }
>         }
>       }
>     }    stateChangeHandlers.values.foreach(callAfterInitializingSession _)
>   } {code}
> The code inside "{_}inWriteLock(initializationLock){_}" block is not running, 
> because the connection already was establised with live ZK nodes (in logs I 
> didn't find the message s"{_}Initializing a new session to 
> $connectString.{_}").
> But the code "{*}callBeforeInitializingSession{*}" calls 
> "{*}KafkaController.startup.beforeInitializingSession{*}" which fires 
> "{*}ControllerEvent.Expire{*}" event and the code 
> "{*}callAfterInitializingSession"{*} calls 
> "{*}KafkaController.startup.afterInitializingSession{*}" and fires 
> "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event.
> The event "{*}ControllerEvent.Expire{*}" call 
> "{*}KafkaController.processExpire"{*} method which shutting down the current 
> controller and in logs we see next:
> {code:java}
> [2021-12-16 14:08:45,022] DEBUG [Controller id=2] Resigning 
> (kafka.controller.KafkaController)
> [2021-12-16 14:08:45,022] DEBUG [Controller id=2] Unregister 
> BrokerModifications handler for Set(1, 2, 3) 
> (kafka.controller.KafkaController)
> [2021-12-16 14:08:45,023] INFO [PartitionStateMachine controllerId=2] Stopped 
> partition state machine (kafka.controller.ZkPartitionStateMachine)
> [2021-12-16 14:08:45,025] INFO [ReplicaStateMachine controllerId=2] Stopped 
> replica state machine (kafka.controller.ZkReplicaStateMachine)
> [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutting 
> down (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Stopped 
> (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutdown 
> completed (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,031] INFO [RequestSendThread controllerId=2] Shutting 
> down (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,032] INFO [RequestSendThread controllerId=2] Stopped 
> (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,033] INFO [RequestSendThread controllerId=2] Shutdown 
> completed (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutting 
> down (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Stopped 
> (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutdown 
> completed (kafka.controller.RequestSendThread)
> [2021-12-16 14:08:45,040] INFO [Controller id=2] Resigned 
> (kafka.controller.KafkaController) {code}
> And then "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event calls method 
> "{*}KafkaController.processRegisterBrokerAndReelect{*}" => 
> "{*}KafkaController.elect{*}". But in the logs there is next:
>  
> {code:java}
> [2021-12-16 14:08:45,129] DEBUG [Controller id=2] Broker 2 has been elected 
> as the controller, so stopping the election process. 
> (kafka.controller.KafkaController) {code}
> There is a part of the method *"KafkaController.elect"* which write the 
> message above
>  
>  
> {code:java}
>   private def elect(): Unit = {
>     activeControllerId = zkClient.getControllerId.getOrElse(-1)
>     /*
>      * We can get here during the initial startup and the handleDeleted ZK 
> callback. Because of the potential race condition,
>      * it's possible that the controller has already been elected when we get 
> here. This check will prevent the following
>      * createEphemeralPath method from getting into an infinite loop if this 
> broker is already the controller.
>      */
>     if (activeControllerId != -1) {
>       debug(s"Broker $activeControllerId has been elected as the controller, 
> so stopping the election process.")
>       return
>     } {code}
> The problem is:
> The current controller was shutting down because of 
> "{*}KafkaController.processExpire{*}" event but a new one wasn't elected 
> because ZK didn't clean /controller node.
> So in logs we see that Controller id=2 was resigned and then stopped election 
> because think that it is still a live controller, but all listeners of ZK 
> already shutted down.
> If after that I restart any non controller brokers (or they were restarted by 
> any reason) then they don't get metadata because there is no the controller 
> in cluster.
> And if we try to consume data from this brokers we get next errors:
> {code:java}
> /opt/kafka/bin/kafka-console-consumer.sh --topic test_topic --from-beginning 
> --bootstrap-server  $(hostname):9091 --consumer.config ~/connect.properties 
> WARN [Consumer clientId=consumer-console-consumer-57293-1, 
> groupId=console-consumer-57293] Error while fetching metadata with 
> correlation id 2 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient)
> WARN [Consumer clientId=consumer-console-consumer-57293-1, 
> groupId=console-consumer-57293] Error while fetching metadata with 
> correlation id 3 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} 
> (org.apache.kafka.clients.NetworkClient){code}
> UPD: Workaround for this bug:
>  # Setup proper SASL connection for ZK
>  # Disable using SASL connection for ZK via property 
> "-Dzookeeper.sasl.client=false"
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to