oneby-wang opened a new pull request, #25910:
URL: https://github.com/apache/pulsar/pull/25910
### Motivation
`ZKSessionTest.testReacquireLeadershipAfterSessionLost` can observe unstable
metadata session events after a ZooKeeper session expires and
`PulsarZooKeeperClient` creates a replacement client.
Failure test case1:
```
java.lang.AssertionError: expected [SessionReestablished] but found
[ConnectionLost]
at org.testng.Assert.fail(Assert.java:111)
at org.testng.Assert.failNotEquals(Assert.java:1590)
at org.testng.Assert.assertEqualsImpl(Assert.java:150)
at org.testng.Assert.assertEquals(Assert.java:132)
at org.testng.Assert.assertEquals(Assert.java:644)
at
org.apache.pulsar.metadata.ZKSessionTest.testReacquireLeadershipAfterSessionLost(ZKSessionTest.java:230)
at
java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at
org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:141)
at
org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
at
org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
at
org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
```
Failure test case2:
```
java.util.concurrent.CompletionException:
org.apache.pulsar.metadata.api.MetadataStoreException:
org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode =
Session expired
at
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
at
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
at
java.base/java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:781)
at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194)
at
org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$7(ZKMetadataStore.java:253)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
at
org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$9(ZKMetadataStore.java:253)
at
org.apache.pulsar.metadata.impl.PulsarZooKeeperClient$3$1.processResult(PulsarZooKeeperClient.java:524)
at
org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:702)
at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:541)
Caused by: org.apache.pulsar.metadata.api.MetadataStoreException:
org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode =
Session expired
at
org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:528)
at
org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$9(ZKMetadataStore.java:252)
... 3 more
Caused by: org.apache.zookeeper.KeeperException$SessionExpiredException:
KeeperErrorCode = Session expired
at org.apache.zookeeper.KeeperException.create(KeeperException.java:133)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:53)
at
org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:518)
... 4 more
Cause 1: org.apache.pulsar.metadata.api.MetadataStoreException:
org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode =
Session expired
at
app//org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:528)
at
app//org.apache.pulsar.metadata.impl.ZKMetadataStore.lambda$batchOperation$9(ZKMetadataStore.java:252)
at
app//org.apache.pulsar.metadata.impl.PulsarZooKeeperClient$3$1.processResult(PulsarZooKeeperClient.java:524)
at
app//org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:702)
at
app//org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:541)
Caused by: org.apache.zookeeper.KeeperException$SessionExpiredException:
KeeperErrorCode = Session expired
at
app//org.apache.zookeeper.KeeperException.create(KeeperException.java:133)
at
app//org.apache.zookeeper.KeeperException.create(KeeperException.java:53)
at
app//org.apache.pulsar.metadata.impl.ZKMetadataStore.getException(ZKMetadataStore.java:518)
... 4 more
```
The race happens during the handoff from the expired ZooKeeper instance to
the new one:
- `ZooKeeper` can deliver `SyncConnected` while the new client is still
being constructed.
- `ZooKeeperWatcherBase` forwards that session event to child watchers.
- Those child watchers can run before `PulsarZooKeeperClient` publishes the
new `ZooKeeper` handle.
- During that window, follow-up operations can still be routed to the old
expired handle, and an old async session probe can later overwrite the state of
the newly established session.
This can produce extra or incomplete session transitions around
`ConnectionLost`, `SessionLost`, `Reconnected`, and `SessionReestablished`.
### Modifications
This change keeps the reconnect flow local to `PulsarZooKeeperClient` and
`ZKSessionWatcher`.
- `PulsarZooKeeperClient` now creates replacement ZooKeeper clients with a
forwarding watcher instead of passing `watcherManager` directly.
- The forwarding watcher waits until the new ZooKeeper handle has been
published before forwarding events to `watcherManager`.
- The new handle is published before releasing the forwarding watcher, and
`waitForConnection()` runs after that release because it depends on the
forwarded `SyncConnected` event.
- `ZKSessionWatcher` records the session id used for its async `exists("/")`
probe and only applies the probe result if the current session id still matches.
- The session-id check and state transition are guarded by the same
synchronized section so a stale probe cannot race with a new-session event and
overwrite the new session state.
### Verifying this change
- [x] Make sure that the change passes the CI checks.
### Does this pull request potentially affect one of the following parts:
*If the box was checked, please highlight the changes*
- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]