[ https://issues.apache.org/jira/browse/KAFKA-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexandre Dupriez updated KAFKA-14845: -------------------------------------- Attachment: broker-registration.drawio (4).png > Broker ZNode creation can fail due to lost Zookeeper Session ID > --------------------------------------------------------------- > > Key: KAFKA-14845 > URL: https://issues.apache.org/jira/browse/KAFKA-14845 > Project: Kafka > Issue Type: Bug > Reporter: Alexandre Dupriez > Assignee: Alexandre Dupriez > Priority: Minor > Attachments: broker-registration.drawio (4).png, kafka-broker-reg.log > > > Our production environment faced a use case where registration of a broker > failed due to the presence of a "conflicting" broker znode in Zookeeper. This > case is not without familiarity to that fixed by KAFKA-6584 and induced by > the Zookeeper bug (or feature) tracked in ZOOKEEPER-2985 opened as of today. > A network partition disturbed communication channels between the Kafka and > Zookeeper clusters for about 20% of the brokers in the cluster. One of this > broker was not able to re-register with Zookeeper and was excluded from the > cluster until it was restarted. Broker logs show the failed registration due > to a "conflicting" znode write which in this case does not exactly match the > scenario covered by KAFKA-6584. > The sequence of logs on the broker is as follows. > First, a connection is established with the Zookeeper node 3. > {code:java} > [2023-03-05 16:01:55,342] INFO Socket connection established, initiating > session, client: /1.2.3.4:40200, server: zk.3/5.6.7.8:2182 > (org.apache.zookeeper.ClientCnxn) > [2023-03-05 16:01:55,342] INFO channel is connected: [id: 0x2b45ae40, > L:/1.1.3.4:40200 - R:zk.3/5.6.7.8:2182] > (org.apache.zookeeper.ClientCnxnSocketNetty){code} > An existing Zookeeper session was expired, and upon reconnection, the > Zookeeper state change handler was invoked. The creation of the ephemeral > znode /brokers/ids/18 started on the controller thread. > {code:java} > [2023-03-05 16:01:55,345] INFO Creating /brokers/ids/18 (is it secure? false) > (kafka.zk.KafkaZkClient){code} > The client "session" timed out after 6 seconds. Note the session is 0x0 and > the absence of "{_}Session establishment complete{_}" log: the broker appears > to have never received or processed the response from the Zookeeper node. > {code:java} > [2023-03-05 16:02:01,343] INFO Client session timed out, have not heard from > server in 6000ms for sessionid 0x0, closing socket connection and attempting > reconnect (org.apache.zookeeper.ClientCnxn) > [2023-03-05 16:02:01,343] INFO channel is disconnected: [id: 0x2b45ae40, > L:/1.2.3.4:40200 ! R:zk.3/5.6.7.8:2182] > (org.apache.zookeeper.ClientCnxnSocketNetty){code} > Pending requests were aborted with a {{CONNECTIONLOSS}} error and the client > started waiting on a new connection notification. > {code:java} > [2023-03-05 16:02:01,343] INFO [ZooKeeperClient Kafka server] Waiting until > connected. (kafka.zookeeper.ZooKeeperClient){code} > A new connection was created with the Zookeeper node 1. Note that a valid > (new) session ({{{}0x1006c6e0b830001{}}}) was reported by Kafka this time. > {code:java} > [2023-03-05 16:02:02,037] INFO Socket connection established, initiating > session, client: /1.2.3.4:58080, server: zk.1/9.10.11.12:2182 > (org.apache.zookeeper.ClientCnxn) > [2023-03-05 16:02:02,037] INFO channel is connected: [id: 0x68fba106, > L:/1.2.3.4:58080 - R:zk.1/9.10.11.12:2182] > (org.apache.zookeeper.ClientCnxnSocketNetty) > [2023-03-05 16:02:03,054] INFO Session establishment complete on server > zk.1/9.10.11.12:2182, sessionid = 0x1006c6e0b830001, negotiated timeout = > 18000 (org.apache.zookeeper.ClientCnxn){code} > The Kafka ZK client is notified of the connection. > {code:java} > [2023-03-05 16:02:03,054] INFO [ZooKeeperClient Kafka server] Connected. > (kafka.zookeeper.ZooKeeperClient){code} > The broker sends the request to create the znode {{/brokers/ids/18}} which > already exists. The error path implemented for KAFKA-6584 is then followed. > However, in this case, the session owning the ephemeral node > {{0x300000043230ac1}} ({{{}216172783240153793{}}}) is different from the last > active Zookeeper session which the broker has recorded. And it is also > different from the current session {{0x1006c6e0b830001}} > ({{{}72176813933264897{}}}), hence the recreation of the broker znode is not > attempted. > {code:java} > [2023-03-05 16:02:04,466] ERROR Error while creating ephemeral at > /brokers/ids/18, node already exists and owner '216172783240153793' does not > match current session '72176813933264897' > (kafka.zk.KafkaZkClient$CheckedEphemeral) > org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = > NodeExists > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:126) > at > kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1821) > at > kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1759) > at > kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1726) > at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:95) > at > kafka.controller.KafkaController.processRegisterBrokerAndReelect(KafkaController.scala:1810) > at > kafka.controller.KafkaController.process(KafkaController.scala:1853) > at > kafka.controller.QueuedEvent.process(ControllerEventManager.scala:51) > at > kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:127) > at > kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:130) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) > at > kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:130) > at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code} > The session {{0x300000043230ac1}} expires later on as indicated in Zookeeper > server logs: > {code:java} > [2023-03-05 16:02:21,336] INFO Expiring session 0x300000043230ac1, timeout of > 18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer) > [2023-03-05 16:02:21,336] INFO Submitting global closeSession request for > session 0x300000043230ac1 (org.apache.zookeeper.server.ZooKeeperServer) > {code} > The ephemeral znode is then deleted and never recreated. The broker is not > registered. Only a broker restart (or forced recreation of the Zookeeper > session) can mitigate at this point. > An analysis of the commit logs from Zookeeper does show the following > sequence of transactions with timestamps from the Zookeeper node. > - 2023-03-05T16:02:00.973Z: CreationSession [-10] => 0x300000043230ac1 > - 2023-03-05T16:02:02.163Z: Multi [14] = [CreateNode(/brokers/ids/18), > SetData(<BrokerInfo>)] > - 2023-03-05T16:02:21.336Z: CloseSession [-11] > The fix for KAFKA-6584 does not cover this case because here, the session ID > is not surfaced and recorded by the ZK client in Kafka (there was no > lower-level logs to ascertain if the Netty client ever received any response > for that session). > As a remediation, perhaps the source of identity of the broker (currently > conveyed by the Zookeeper session ID) could be explicitly added to the znode > data (assuming the Zookeeper Multi is atomic, the znode must have the > BrokerInfo (or any other user data provided with the SetData command) if and > only if it is successfully created). > ---- > *Update:* this can be reproduced with this [automated > test|https://github.com/Hangleton/kafka-tools/tree/master/kafka-broker-reg]. > The sequence of events produced by the test is the following. > 1) The Zookeeper client is created by the application. It opens a TCP > connection, then send a Connect request which is processed on the Netty NIO > thread pool. A CreateSession request is internally enqueued to be handled by > the synchronous request processor. Once processed, a session id is generated > and recorded, and that session id 0x1001764b3920000 is returned to the client. > {code:java} > [2023-03-29 16:22:04,258] INFO Socket connection established, initiating > session, client: /127.0.0.1:50093, server: localhost/127.0.0.1:2181 > (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:04,268] INFO channel is connected: [id: 0x2e3e926b, > L:/127.0.0.1:50093 - R:localhost/127.0.0.1:2181] > (org.apache.zookeeper.ClientCnxnSocketNetty) > sessionid:0x1001764b3920000 type:createSession cxid:0x0 zxid:0xcc0 > txntype:-10 reqpath:n/a > [2023-03-29 16:22:04,310] INFO Session establishment complete on server > localhost/127.0.0.1:2181, session id = 0x1001764b3920000, negotiated timeout > = 18000 (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:04,315] INFO [ZooKeeperClient ZkClient] Connected. > (kafka.zookeeper.ZooKeeperClient){code} > 2) The Kafka client on top of the Zookeeper client is used to register the > broker. The multiTransaction API in the Zookeeper client is invoked and a > multi request is sent to Zookeeper, with a CreateNode and SetData > transactions. > {code:java} > [2023-03-29 16:22:04,470] INFO Creating /brokers/ids/18 (is it secure? false) > (kafka.zk.KafkaZkClient) > sessionid:0x1001764b3920000 type:multi cxid:0x1 zxid:0xcc1 txntype:14 > reqpath:n/a > [2023-03-29 16:22:04,560] INFO Stat of the created znode at /brokers/ids/18 > is: 3265,3265,1680103324542,1680103324542,1,0,0,72083315314786304,131,0,3265 > (kafka.zk.KafkaZkClient) > [2023-03-29 16:22:04,561] INFO Registered broker 18 at path /brokers/ids/18 > with addresses: PLAINTEXT://localhost:9092, czxid (broker epoch): 3265 > (kafka.zk.KafkaZkClient){code} > 3) The client generates Ping request every 6 seconds (read timeout / 2). The > response to these pings are not sent back to the client (enforced by the > test, see the "Dropping" keyword in stdout). After 12 seconds (read timeout), > the client initiates a new connection. > {code:java} > [ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe > zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a > Dropping -2,3265,0 > [ALTERED RQ] sessionid:0x1001764b3920000 type:ping cxid:0xfffffffffffffffe > zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a > Dropping -2,3265,0 > [2023-03-29 16:22:16,555] WARN Client session timed out, have not heard from > server in 12001ms for session id 0x1001764b3920000 > (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:16,559] WARN Session 0x1001764b3920000 for server > localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect > except it is a SessionExpiredException. (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:16,567] INFO channel is disconnected: [id: 0x2e3e926b, > L:/127.0.0.1:50093 ! R:localhost/127.0.0.1:2181] > (org.apache.zookeeper.ClientCnxnSocketNetty){code} > 4) A new TCP connection is opened and the Connect request sent. A delay is > artificially injected to make the connection request time out (connection > timeout, which is equal to the session timeout in this case). > {code:java} > [2023-03-29 16:22:18,388] INFO Opening socket connection to server > localhost/[0:0:0:0:0:0:0:1]:2181. (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:18,395] INFO Socket connection established, initiating > session, client: /[0:0:0:0:0:0:0:1]:50121, server: > localhost/[0:0:0:0:0:0:0:1]:2181 (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:18,396] INFO channel is connected: [id: 0xc39fba08, > L:/[0:0:0:0:0:0:0:1]:50121 - R:localhost/[0:0:0:0:0:0:0:1]:2181] > (org.apache.zookeeper.ClientCnxnSocketNetty) > >>>> CONNECTION DELAY = 18500 ms{code} > 5) After 18 seconds (which is both the session and connection timeout), the > server expires the session and an internal closeSession is generated on the > server and enqueued to be processed by the synchronous request processor. > {code:java} > [2023-03-29 16:22:34,638] INFO Expiring session 0x1001764b3920000, timeout of > 18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer) > sessionid:0x1001764b3920000 type:closeSession cxid:0x0 zxid:0xcc3 txntype:-11 > reqpath:n/a > [2023-03-29 16:22:36,401] WARN Client session timed out, have not heard from > server in 18005ms for session id 0x1001764b3920000 > (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:36,401] WARN Session 0x1001764b3920000 for server > localhost/[0:0:0:0:0:0:0:1]:2181, Closing socket connection. Attempting > reconnect except it is a SessionExpiredException. > (org.apache.zookeeper.ClientCnxn){code} > 6) In parallel, the client detects the connection timeout and reconnects. A > new TCP connection is opened, the client is notified by the server that the > session 0x10016b7f51c0000 has expired. > {code:java} > [2023-03-29 16:22:37,972] INFO Opening socket connection to server > localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:37,976] INFO Socket connection established, initiating > session, client: /127.0.0.1:50131, server: localhost/127.0.0.1:2181 > (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:37,976] INFO channel is connected: [id: 0x17aecbd7, > L:/127.0.0.1:50131 - R:localhost/127.0.0.1:2181] > (org.apache.zookeeper.ClientCnxnSocketNetty) > [2023-03-29 16:22:37,988] INFO Invalid session 0x1001764b3920000 for client > /127.0.0.1:50131, probably expired > (org.apache.zookeeper.server.ZooKeeperServer) > [2023-03-29 16:22:37,989] WARN Unable to reconnect to ZooKeeper service, > session 0x1001764b3920000 has expired (org.apache.zookeeper.ClientCnxn){code} > 7) The client closes its "handle" (~client in Zookeeper terminology) with > Zookeeper. > {code:java} > [2023-03-29 16:22:37,994] INFO EventThread shut down for session: > 0x1001764b3920000 (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:37,996] INFO [ZooKeeperClient ZkClient] Session expired. > (kafka.zookeeper.ZooKeeperClient) > [2023-03-29 16:22:37,998] INFO [ZooKeeperClient ZkClient] Initializing a new > session to localhost:2181. (kafka.zookeeper.ZooKeeperClient){code} > 8) The client creates a new handle which initiates a new session with > Zookeeper. A new TCP connection is opened, and the createSession is > propagated to the synchronous request processor. The session is created by > the server, but the response never sent back to the client. > {code:java} > [2023-03-29 16:22:37,998] INFO Initiating client connection, > connectString=localhost:2181 sessionTimeout=18000 > watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@649bec2e > (org.apache.zookeeper.ZooKeeper) > [2023-03-29 16:22:37,999] INFO Opening socket connection to server > localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:38,005] INFO Socket connection established, initiating > session, client: /127.0.0.1:50132, server: localhost/127.0.0.1:2181 > (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:38,006] INFO channel is connected: [id: 0x435b25a4, > L:/127.0.0.1:50132 - R:localhost/127.0.0.1:2181] > (org.apache.zookeeper.ClientCnxnSocketNetty) > [ALTERED RQ] sessionid:0x1001764b3920001 type:createSession cxid:0x0 > zxid:0xcc4 txntype:-10 reqpath:n/a > Dropping [Ljava.nio.ByteBuffer;@2a9f4f64{code} > 9) An inter-thread signal triggered during the creation of the second session > on the synchronous request processor is captured by the main (application) > thread which starts a broker registration. The client immediately sends the > multi request to the server using the opened TCP connection even before > receiving confirmation of the successful processing of the ConnectRequest, > and without knowing the id of the current session. The server processes the > multi request and creates the znode /brokers/ids/18 using the session > 0x1001764b3920001. The response is not sent to the client, which is not made > aware the request has been processed. > {code:java} > [2023-03-29 16:22:38,023] INFO Creating /brokers/ids/18 (is it secure? false) > (kafka.zk.KafkaZkClient) > [ALTERED RQ] sessionid:0x1001764b3920001 type:multi cxid:0x1 zxid:0xcc5 > txntype:14 reqpath:n/a > Dropping 1,3269,0 > org.apache.zookeeper.MultiResponse@99bc5055{code} > 10) The connection times out again, since no response is sent to the client > within 18 seconds. The Zookeeper client library returns a CONNLOSS response > to the multi request. The Zookeeper client built on top of it in Kafka > retries on this type of error. So, it retries, waiting for a new connection > to be established. > {code:java} > [2023-03-29 16:22:56,012] WARN Client session timed out, have not heard from > server in 18005ms for session id 0x0 (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:56,014] WARN Session 0x0 for server > localhost/127.0.0.1:2181, Closing socket connection. Attempting reconnect > except it is a SessionExpiredException. > (org.apache.zookeeper.ClientCnxn){code} > 11) A new connection attempt is established. This time, we allow it to go > through. > {code:java} > [2023-03-29 16:22:57,490] INFO Opening socket connection to server > localhost/127.0.0.1:2181. (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:57,494] INFO Socket connection established, initiating > session, client: /127.0.0.1:50166, server: localhost/127.0.0.1:2181 > (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:57,495] INFO channel is connected: [id: 0xa086adad, > L:/127.0.0.1:50166 - R:localhost/127.0.0.1:2181] > (org.apache.zookeeper.ClientCnxnSocketNetty) > sessionid:0x1001764b3920002 type:createSession cxid:0x0 zxid:0xcc6 > txntype:-10 reqpath:n/a > [2023-03-29 16:22:57,522] INFO Session establishment complete on server > localhost/127.0.0.1:2181, session id = 0x1001764b3920002, negotiated timeout > = 18000 (org.apache.zookeeper.ClientCnxn) > [2023-03-29 16:22:57,522] INFO [ZooKeeperClient ZkClient] Connected. > (kafka.zookeeper.ZooKeeperClient){code} > 12) The multi request is retried and the response NODEEXISTS is received. > Kafka then sends a getData to Zookeeper to find the ephemeral owner of the > znode. The ephemeral owner is 0x1001764b3920001 (72082573385007105) which > matches neither the current session 0x1001764b3920002 (72083315314786306) nor > the previously recorded one in the Kafka client (0x1001764b3920000) during > the first znode creation. > {code:java} > sessionid:0x1001764b3920002 type:multi cxid:0x2 zxid:0xcc7 txntype:14 > reqpath:n/a > sessionid:0x1001764b3920002 type:getData cxid:0x3 zxid:0xfffffffffffffffe > txntype:unknown reqpath:/brokers/ids/18 > [2023-03-29 16:22:57,539] ERROR Error while creating ephemeral at > /brokers/ids/18, node already exists and owner '72083315314786305' does not > match current session '72083315314786306' > (kafka.zk.KafkaZkClient$CheckedEphemeral) > org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = > NodeExists > at org.apache.zookeeper.KeeperException.create(KeeperException.java:126) > at > kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:2185) > at > kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:2123) > at kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:2090) > at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:102) > at repro.BrokerRegistrationTest.main(BrokerRegistrationTest.java:172) > {code} > Note that on the diagram, we introduce a delay in the expiration of the > session 0x1001764b3920001. The expiration of the session, and deletion of its > associated ephemeral nodes, is scheduled on the synchronous processor > asynchronously of incoming requests, such that it is possible for the multi > request to come and be consumed by the synchronous processor before the > closeSession request for 0x1001764b3920001. The ephemeral znode for that > session is therefore still present in Zookeeper's data tree when the new > multi request is processed. -- This message was sent by Atlassian Jira (v8.20.10#820010)