[ 
https://issues.apache.org/jira/browse/KAFKA-14845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17705998#comment-17705998
 ] 

Alexandre Dupriez commented on KAFKA-14845:
-------------------------------------------

The reproduction test case has been update and is available [in 
github|https://github.com/Hangleton/kafka-tools/tree/master/kafka-broker-reg]. 
It does not require any forced session renewal but just reproduce the use case 
using:
 * Connection delay
 * Response drops
 * Session expiration delay

> Broker ZNode creation can fail due to a session ID unknown to the broker
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-14845
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14845
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Alexandre Dupriez
>            Assignee: Alexandre Dupriez
>            Priority: Minor
>         Attachments: broker-registration.drawio.png
>
>
> Our production environment faced a use case where registration of a broker 
> failed due to the presence of a "conflicting" broker znode in Zookeeper. This 
> case is not without familiarity to that fixed by KAFKA-6584 and induced by 
> the Zookeeper bug (or feature) tracked in ZOOKEEPER-2985 opened as of today.
> A network partition disturbed communication channels between the Kafka and 
> Zookeeper clusters for about 20% of the brokers in the cluster. One of this 
> broker was not able to re-register with Zookeeper and was excluded from the 
> cluster until it was restarted. Broker logs show the failed registration due 
> to a "conflicting" znode write which in this case does not exactly match the 
> scenario covered by KAFKA-6584.
> The sequence of logs on the broker is as follows.
> First, a connection is established with the Zookeeper node 3.
> {code:java}
> [2023-03-05 16:01:55,342] INFO Socket connection established, initiating 
> session, client: /1.2.3.4:40200, server: zk.3/5.6.7.8:2182 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-05 16:01:55,342] INFO channel is connected: [id: 0x2b45ae40, 
> L:/1.1.3.4:40200 - R:zk.3/5.6.7.8:2182] 
> (org.apache.zookeeper.ClientCnxnSocketNetty){code}
> An existing Zookeeper session was expired, and upon reconnection, the 
> Zookeeper state change handler was invoked. The creation of the ephemeral 
> znode /brokers/ids/18 started on the controller thread.
> {code:java}
> [2023-03-05 16:01:55,345] INFO Creating /brokers/ids/18 (is it secure? false) 
> (kafka.zk.KafkaZkClient){code}
> The client "session" timed out after 6 seconds. Note the session is 0x0 and 
> the absence of "{_}Session establishment complete{_}" log: the broker appears 
> to have never received or processed the response from the Zookeeper node.
> {code:java}
> [2023-03-05 16:02:01,343] INFO Client session timed out, have not heard from 
> server in 6000ms for sessionid 0x0, closing socket connection and attempting 
> reconnect (org.apache.zookeeper.ClientCnxn)
> [2023-03-05 16:02:01,343] INFO channel is disconnected: [id: 0x2b45ae40, 
> L:/1.2.3.4:40200 ! R:zk.3/5.6.7.8:2182] 
> (org.apache.zookeeper.ClientCnxnSocketNetty){code}
> Pending requests were aborted with a {{CONNECTIONLOSS}} error and the client 
> started waiting on a new connection notification.
> {code:java}
> [2023-03-05 16:02:01,343] INFO [ZooKeeperClient Kafka server] Waiting until 
> connected. (kafka.zookeeper.ZooKeeperClient){code}
> A new connection was created with the Zookeeper node 1. Note that a valid 
> (new) session ({{{}0x1006c6e0b830001{}}}) was reported by Kafka this time.
> {code:java}
> [2023-03-05 16:02:02,037] INFO Socket connection established, initiating 
> session, client: /1.2.3.4:58080, server: zk.1/9.10.11.12:2182 
> (org.apache.zookeeper.ClientCnxn)
> [2023-03-05 16:02:02,037] INFO channel is connected: [id: 0x68fba106, 
> L:/1.2.3.4:58080 - R:zk.1/9.10.11.12:2182] 
> (org.apache.zookeeper.ClientCnxnSocketNetty)
> [2023-03-05 16:02:03,054] INFO Session establishment complete on server 
> zk.1/9.10.11.12:2182, sessionid = 0x1006c6e0b830001, negotiated timeout = 
> 18000 (org.apache.zookeeper.ClientCnxn){code}
> The Kafka ZK client is notified of the connection.
> {code:java}
> [2023-03-05 16:02:03,054] INFO [ZooKeeperClient Kafka server] Connected. 
> (kafka.zookeeper.ZooKeeperClient){code}
> The broker sends the request to create the znode {{/brokers/ids/18}} which 
> already exists. The error path implemented for KAFKA-6584 is then followed. 
> However, in this case, the session owning the ephemeral node 
> {{0x300000043230ac1}} ({{{}216172783240153793{}}}) is different from the last 
> active Zookeeper session which the broker has recorded. And it is also 
> different from the current session {{0x1006c6e0b830001}} 
> ({{{}72176813933264897{}}}), hence the recreation of the broker znode is not 
> attempted.
> {code:java}
> [2023-03-05 16:02:04,466] ERROR Error while creating ephemeral at 
> /brokers/ids/18, node already exists and owner '216172783240153793' does not 
> match current session '72176813933264897' 
> (kafka.zk.KafkaZkClient$CheckedEphemeral)
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = 
> NodeExists
>         at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:126)
>         at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.getAfterNodeExists(KafkaZkClient.scala:1821)
>         at 
> kafka.zk.KafkaZkClient$CheckedEphemeral.create(KafkaZkClient.scala:1759)
>         at 
> kafka.zk.KafkaZkClient.checkedEphemeralCreate(KafkaZkClient.scala:1726)
>         at kafka.zk.KafkaZkClient.registerBroker(KafkaZkClient.scala:95)
>         at 
> kafka.controller.KafkaController.processRegisterBrokerAndReelect(KafkaController.scala:1810)
>         at 
> kafka.controller.KafkaController.process(KafkaController.scala:1853)
>         at 
> kafka.controller.QueuedEvent.process(ControllerEventManager.scala:51)
>         at 
> kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:127)
>         at 
> kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:130)
>         at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
>         at 
> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:130)
>         at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96){code}
> The session {{0x300000043230ac1}} expires later on as indicated in Zookeeper 
> server logs:
> {code:java}
> [2023-03-05 16:02:21,336] INFO Expiring session 0x300000043230ac1, timeout of 
> 18000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
> [2023-03-05 16:02:21,336] INFO Submitting global closeSession request for 
> session 0x300000043230ac1 (org.apache.zookeeper.server.ZooKeeperServer)
> {code}
> The ephemeral znode is then deleted and never recreated. The broker is not 
> registered. Only a broker restart (or forced recreation of the Zookeeper 
> session) can mitigate at this point.
> I fail to understand where the session {{0x300000043230ac1}} comes from. An 
> analysis of the commit logs from Zookeeper does show the following sequence 
> of transactions with timestamps from the Zookeeper node.
>  - 2023-03-05T16:02:00.973Z: CreationSession [-10] => 0x300000043230ac1
>  - 2023-03-05T16:02:02.163Z: Multi [14] = [CreateNode(/brokers/ids/18), 
> SetData(<BrokerInfo>)]
>  - 2023-03-05T16:02:21.336Z: CloseSession [-11]
> A code based example was attempted to reproduce the use case (available 
> [here|https://github.com/Hangleton/kafka-tools/blob/master/src/main/java/kafka/repro/BrokerRegistrationTest.java]),
>  however, it fails to reproduce the occurrence of the "phantom" intermediate 
> session. Instead, the test case forces session renewal after the Multi 
> transaction is processed for the first time. It basically tries to produce 
> the following sequence.
> !broker-registration.drawio.png!
> - The ephemeral owner recorded after a successful registration is 
> {{0xAAAAAAAAAAAAAA}}.
> - The session expires and a new one ({{0x1000c7cbe1d0000}}) is created.
> - The broker registration callback is invoked on the controller. The Zk 
> client enqueues the {{Multi}} request.
> - The response is never receives by the broker.
> - The client connection times out.
> - The session expires.
> - Upon reconnection, a new session ({{0x1000c7cbe1d0001}}) is created.
> - The Zk client retries upon receiving the {{CONNECTIONLOSS}} response.
> Note the session renewal right after the response from the first Multi 
> request is dropped. There is a race condition between the {{AdminZkClient}} 
> and the underlying ZK client which is explicitly controlled in this test so 
> that the {{AdminZkClient}} does not see the ZK client as disconnected.
> The fix for KAFKA-6584 does not cover this case because here, the session ID 
> is not surfaced and recorded by the ZK client in Kafka (there was no 
> lower-level logs to ascertain if the Netty client ever received any response 
> for that session).
> As a remediation, perhaps the source of identity of the broker (currently 
> conveyed by the Zookeeper session ID) could be explicitly added to the znode 
> data (assuming the Zookeeper Multi is atomic, the znode must have the 
> BrokerInfo (or any other user data provided with the SetData command) if and 
> only if it is successfully created).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to