zhaizhibo opened a new pull request, #25237: URL: https://github.com/apache/pulsar/pull/25237
### Motivation When using PulsarZooKeeperClient with sessionWatcher enabled, a race condition during ZooKeeper session reconnection can cause operations to fail with SESSIONEXPIRED errors even after the session has been successfully re-established. This may also result in the loss of notifications for the SessionReestablished event. In ZKSessionWatcher, currentStatus = SessionEvent.SessionLost. Now, we reconnect to zkļ¼and trigger ZKSessionWatcher::checkState(SyncConnected). | thread 1 | thread 2 |thread 3| |-------|-------|-------| | sessionListener.accept(SessionReestablished) | || | ZKMetadataStore::receivedSessionEvent | || | currentStatus = SessionEvent.SessionReestablished | || | | addWatcher failed, cause old zk has expired || | | sessionWatcher.setSessionInvalid(), as currentStatus = SessionEvent.SessionLost|| | | | trigger PulsarZooKeeperClient.clientCreator, so we close the old zk client| |sessionListener.accept(SessionReestablished)||| | ZKMetadataStore::receivedSessionEvent | || | addWatcher failed, cause old zk has bean closed, failed in the same thread.(org.apache.zookeeper.ClientCnxn#conLossPacket) | || | sessionWatcher.setSessionInvalid(),as currentStatus = SessionEvent.SessionLost| || | currentStatus = SessionEvent.SessionReestablished | || | | | create new zk client, and set new zk in PulsarZooKeeperClient::zk | After this, because the state has already been updated to SessionReestablished, the next time the session state is checked, sessionListener.accept(SessionEvent.SessionReestablished); will no longer be triggered. ### Modifications 1. set currentStatus to SessionReestablished before invoking sessionListener.accept(SessionEvent.SessionReestablished), ensuring this state update happens-before any subsequent modifications. 2. introduce a read-write lock to synchronize updates to the ZooKeeper client. ### Does this pull request potentially affect one of the following parts: <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] The metrics - [ ] Anything that affects deployment ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
