[ https://issues.apache.org/jira/browse/KAFKA-13556?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
joecqupt resolved KAFKA-13556. ------------------------------ Assignee: joecqupt Resolution: Duplicate > Kafka cluster loses the controller > ---------------------------------- > > Key: KAFKA-13556 > URL: https://issues.apache.org/jira/browse/KAFKA-13556 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 2.8.1 > Reporter: Andrei Lakhmanets > Assignee: joecqupt > Priority: Major > > Hi team, > *Kafka version:* 2.8.1 > *Configuration:* 3 kafka brokers in different availability zones and 3 > zookeeper brokers in different availability zones. > I faced with a bug when kafka cluster loses the controller and if after that > restart any none controller broker then it stops processing data. > *Context:* > The kafka cluster has SASL configuration for connection. > {code:java} > # Listeners config > listeners=SASL_SSL://[::]:9091,SASL_PLAINTEXT://[::]:9092 > advertised.listeners=SASL_SSL://---DOMAIN---:9091,SASL_PLAINTEXT://---DOMAIN---:9092sasl.enabled.mechanisms=SCRAM-SHA-512 > inter.broker.listener.name=SASL_SSL > sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 > inter.broker.protocol.version=2.8-IV0 > # Keystore config > ssl.keystore.location=/etc/kafka/ssl/server.keystore.jks > ssl.keystore.password=---PASSWORD--- > ssl.key.password=---PASSWORD--- > ssl.truststore.location=/etc/kafka/ssl/server.truststore.jks > ssl.truststore.password=---PASSWORD--- > zookeeper.connect=---ZK_HOST_1---:2181,---ZK_HOST_2---:2181,---ZK_HOST_3---:2181 > zookeeper.connection.timeout.ms=6000 {code} > Zookeeper doesn't have SASL configuration and uses connection without any > authentication. > So, when I start kafka brokers I see in logs error about auth failed but then > ZK client switches to connection without authentication and all works fine. > {code:java} > [2021-12-16 14:04:08,451] INFO [ZooKeeperClient Kafka server] Waiting until > connected. (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:04:08,462] WARN SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:04:08,466] DEBUG [ZooKeeperClient Kafka server] Received > event: WatchedEvent state:AuthFailed type:None path:null > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:04:08,467] ERROR [ZooKeeperClient Kafka server] Auth failed. > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:04:08,470] INFO Opening socket connection to server > ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:04:08,478] INFO Socket connection established, initiating > session, client: /===BROKER_2_IP===:48022, server: > ===ZOOKEEPER_HOST_1===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:04:08,496] INFO Session establishment complete on server > ===ZOOKEEPER_HOST_1===:2181, sessionid = 0x10028c111930125, negotiated > timeout = 18000 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:04:08,497] DEBUG [ZooKeeperClient Kafka server] Received > event: WatchedEvent state:SyncConnected type:None path:null > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:04:08,498] INFO [ZooKeeperClient Kafka server] Connected. > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:04:08,753] INFO [feature-zk-node-event-process-thread]: > Starting > (kafka.server.FinalizedFeatureChangeListener$ChangeNotificationProcessorThread) > {code} > Then I restart ZK node (or it restarts somehow) which holds connection with > kafka controller node (===ZOOKEEPER_HOST_1===) and in logs I see next: > {code:java} > [2021-12-16 14:08:42,951] INFO Unable to read additional data from server > sessionid 0x10028c111930126, likely server has closed socket, closing socket > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:42,951] INFO Unable to read additional data from server > sessionid 0x10028c111930125, likely server has closed socket, closing socket > connection and attempting reconnect > (org.apache.zookeeper.ClientCnxn)[2021-12-16 14:08:43,052] DEBUG > [ZooKeeperClient ACL authorizer] Received event: WatchedEvent > state:Disconnected type:None path:null (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:43,053] DEBUG [ZooKeeperClient Kafka server] Received > event: WatchedEvent state:Disconnected type:None path:null > (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:43,583] WARN SASL > configuration failed: javax.security.auth.login.LoginException: No JAAS > configuration section named 'Client' was found in specified JAAS > configuration file: '/etc/kafka/kafka_server_jaas.conf'. Will continue > connection to Zookeeper server without SASL authentication, if Zookeeper > server allows it. (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:43,583] INFO Opening socket connection to server > ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:43,583] DEBUG [ZooKeeperClient ACL authorizer] Received > event: WatchedEvent state:AuthFailed type:None path:null > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:43,584] ERROR [ZooKeeperClient ACL authorizer] Auth failed. > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:43,593] INFO Socket connection established, initiating > session, client: /===BROKER_2_IP===:48560, server: > ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:43,808] INFO Unable to read additional data from server > sessionid 0x10028c111930126, likely server has closed socket, closing socket > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:43,910] DEBUG [ZooKeeperClient ACL authorizer] Received > event: WatchedEvent state:Disconnected type:None path:null > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,019] WARN SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,019] INFO Opening socket connection to server > ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,019] DEBUG [ZooKeeperClient Kafka server] Received > event: WatchedEvent state:AuthFailed type:None path:null > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,019] ERROR [ZooKeeperClient Kafka server] Auth failed. > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,022] INFO Socket connection established, initiating > session, client: /===BROKER_2_IP===:37734, server: > ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,023] INFO Unable to read additional data from server > sessionid 0x10028c111930125, likely server has closed socket, closing socket > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,123] DEBUG [ZooKeeperClient Kafka server] Received > event: WatchedEvent state:Disconnected type:None path:null > (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,124] INFO > [ZooKeeperClient Kafka server] Waiting until connected. > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,125] INFO [ZooKeeperClient Kafka server] Waiting until > connected. (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,586] INFO > [ZooKeeperClient ACL authorizer] Reinitializing due to auth failure. > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,704] WARN SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,704] INFO Opening socket connection to server > ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,705] DEBUG [ZooKeeperClient ACL authorizer] Received > event: WatchedEvent state:AuthFailed type:None path:null > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,705] ERROR [ZooKeeperClient ACL authorizer] Auth failed. > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,707] INFO Socket connection established, initiating > session, client: /===BROKER_2_IP===:37738, server: > ===ZOOKEEPER_HOST_2===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,709] INFO Session establishment complete on server > ===ZOOKEEPER_HOST_2===:2181, sessionid = 0x10028c111930126, negotiated > timeout = 18000 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,710] DEBUG [ZooKeeperClient ACL authorizer] Received > event: WatchedEvent state:SyncConnected type:None path:null > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,798] WARN SASL configuration failed: > javax.security.auth.login.LoginException: No JAAS configuration section named > 'Client' was found in specified JAAS configuration file: > '/etc/kafka/kafka_server_jaas.conf'. Will continue connection to Zookeeper > server without SASL authentication, if Zookeeper server allows it. > (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,798] INFO Opening socket connection to server > ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,798] DEBUG [ZooKeeperClient Kafka server] Received > event: WatchedEvent state:AuthFailed type:None path:null > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,798] ERROR [ZooKeeperClient Kafka server] Auth failed. > (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,807] INFO Socket connection established, initiating > session, client: /===BROKER_2_IP===:48568, server: > ===ZOOKEEPER_HOST_3===:2181 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,824] INFO Session establishment complete on server > ===ZOOKEEPER_HOST_3===:2181, sessionid = 0x10028c111930125, negotiated > timeout = 18000 (org.apache.zookeeper.ClientCnxn) > [2021-12-16 14:08:44,824] DEBUG [ZooKeeperClient Kafka server] Received > event: WatchedEvent state:SyncConnected type:None path:null > (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:44,824] INFO > [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient) > [2021-12-16 14:08:44,824] INFO [ZooKeeperClient Kafka server] Connected. > (kafka.zookeeper.ZooKeeperClient)[2021-12-16 14:08:45,020] INFO > [ZooKeeperClient Kafka server] Reinitializing due to auth failure. > (kafka.zookeeper.ZooKeeperClient) {code} > The main points here are next: > 1. The log above is from kafka controller node. > 2. Connection to ZK was lost _"Received event: WatchedEvent > state:Disconnected"_ > 3. Connection was established with live ZK nodes _"Opening socket connection > to server ===ZOOKEEPER_HOST_3===:2181"_ and _"Received event: WatchedEvent > state:SyncConnected"_ > 4. During connection process we get _"Auth failed."_ error as it was during > starting of brokers. > 5. The error "Auth failed" is catched in the code: > *kafka.zookeeper.ZooKeeperClient.ZooKeeperClientWatcher.process* and call > code: > {code:java} > if (initialized) > scheduleReinitialize("auth-failed", "Reinitializing due to auth > failure.", RetryBackoffMs) {code} > where RetryBackoffMs is 1000. > In a second after "Auth failed" message we see message "{_}Reinitializing due > to auth failure.{_}" in the log. > The method *ZooKeeperClient.scheduleReinitialize* calls > {*}ZooKeeperClient.reinitialize{*}. > > {code:java} > private def reinitialize(): Unit = { > // Initialization callbacks are invoked outside of the lock to avoid > deadlock potential since their completion > // may require additional Zookeeper requests, which will block to acquire > the initialization lock > stateChangeHandlers.values.foreach(callBeforeInitializingSession _) > inWriteLock(initializationLock) { > if (!connectionState.isAlive) { > zooKeeper.close() > info(s"Initializing a new session to $connectString.") > // retry forever until ZooKeeper can be instantiated > var connected = false > while (!connected) { > try { > zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, > ZooKeeperClientWatcher, clientConfig) > connected = true > } catch { > case e: Exception => > info("Error when recreating ZooKeeper, retrying after a short > sleep", e) > Thread.sleep(RetryBackoffMs) > } > } > } > } stateChangeHandlers.values.foreach(callAfterInitializingSession _) > } {code} > The code inside "{_}inWriteLock(initializationLock){_}" block is not running, > because the connection already was establised with live ZK nodes (in logs I > didn't find the message s"{_}Initializing a new session to > $connectString.{_}"). > But the code "{*}callBeforeInitializingSession{*}" calls > "{*}KafkaController.startup.beforeInitializingSession{*}" which fires > "{*}ControllerEvent.Expire{*}" event and the code > "{*}callAfterInitializingSession"{*} calls > "{*}KafkaController.startup.afterInitializingSession{*}" and fires > "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event. > The event "{*}ControllerEvent.Expire{*}" call > "{*}KafkaController.processExpire"{*} method which shutting down the current > controller and in logs we see next: > {code:java} > [2021-12-16 14:08:45,022] DEBUG [Controller id=2] Resigning > (kafka.controller.KafkaController) > [2021-12-16 14:08:45,022] DEBUG [Controller id=2] Unregister > BrokerModifications handler for Set(1, 2, 3) > (kafka.controller.KafkaController) > [2021-12-16 14:08:45,023] INFO [PartitionStateMachine controllerId=2] Stopped > partition state machine (kafka.controller.ZkPartitionStateMachine) > [2021-12-16 14:08:45,025] INFO [ReplicaStateMachine controllerId=2] Stopped > replica state machine (kafka.controller.ZkReplicaStateMachine) > [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutting > down (kafka.controller.RequestSendThread) > [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Stopped > (kafka.controller.RequestSendThread) > [2021-12-16 14:08:45,026] INFO [RequestSendThread controllerId=2] Shutdown > completed (kafka.controller.RequestSendThread) > [2021-12-16 14:08:45,031] INFO [RequestSendThread controllerId=2] Shutting > down (kafka.controller.RequestSendThread) > [2021-12-16 14:08:45,032] INFO [RequestSendThread controllerId=2] Stopped > (kafka.controller.RequestSendThread) > [2021-12-16 14:08:45,033] INFO [RequestSendThread controllerId=2] Shutdown > completed (kafka.controller.RequestSendThread) > [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutting > down (kafka.controller.RequestSendThread) > [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Stopped > (kafka.controller.RequestSendThread) > [2021-12-16 14:08:45,037] INFO [RequestSendThread controllerId=2] Shutdown > completed (kafka.controller.RequestSendThread) > [2021-12-16 14:08:45,040] INFO [Controller id=2] Resigned > (kafka.controller.KafkaController) {code} > And then "{*}ControllerEvent.RegisterBrokerAndReelect{*}" event calls method > "{*}KafkaController.processRegisterBrokerAndReelect{*}" => > "{*}KafkaController.elect{*}". But in the logs there is next: > > {code:java} > [2021-12-16 14:08:45,129] DEBUG [Controller id=2] Broker 2 has been elected > as the controller, so stopping the election process. > (kafka.controller.KafkaController) {code} > There is a part of the method *"KafkaController.elect"* which write the > message above > > > {code:java} > private def elect(): Unit = { > activeControllerId = zkClient.getControllerId.getOrElse(-1) > /* > * We can get here during the initial startup and the handleDeleted ZK > callback. Because of the potential race condition, > * it's possible that the controller has already been elected when we get > here. This check will prevent the following > * createEphemeralPath method from getting into an infinite loop if this > broker is already the controller. > */ > if (activeControllerId != -1) { > debug(s"Broker $activeControllerId has been elected as the controller, > so stopping the election process.") > return > } {code} > The problem is: > The current controller was shutting down because of > "{*}KafkaController.processExpire{*}" event but a new one wasn't elected > because ZK didn't clean /controller node. > So in logs we see that Controller id=2 was resigned and then stopped election > because think that it is still a live controller, but all listeners of ZK > already shutted down. > If after that I restart any non controller brokers (or they were restarted by > any reason) then they don't get metadata because there is no the controller > in cluster. > And if we try to consume data from this brokers we get next errors: > {code:java} > /opt/kafka/bin/kafka-console-consumer.sh --topic test_topic --from-beginning > --bootstrap-server $(hostname):9091 --consumer.config ~/connect.properties > WARN [Consumer clientId=consumer-console-consumer-57293-1, > groupId=console-consumer-57293] Error while fetching metadata with > correlation id 2 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} > (org.apache.kafka.clients.NetworkClient) > WARN [Consumer clientId=consumer-console-consumer-57293-1, > groupId=console-consumer-57293] Error while fetching metadata with > correlation id 3 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION} > (org.apache.kafka.clients.NetworkClient){code} > UPD: Workaround for this bug: > # Setup proper SASL connection for ZK > # Disable using SASL connection for ZK via property > "-Dzookeeper.sasl.client=false" > -- This message was sent by Atlassian Jira (v8.20.1#820001)