This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1eab46fb582655c9717f6f9783eebdb82e3b83cb Author: Qiang Zhao <[email protected]> AuthorDate: Tue Jul 12 22:39:38 2022 +0800 [fix][test] Catch exception when update data in mockZookeeper (#16473) (cherry picked from commit 4df2593a62606d3fcfd5ebab8923870814832569) --- .../java/org/apache/zookeeper/MockZooKeeper.java | 161 ++++++++++++--------- 1 file changed, 94 insertions(+), 67 deletions(-) diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java index cd0c60c0087..b9325c16102 100644 --- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java +++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java @@ -334,8 +334,8 @@ public class MockZooKeeper extends ZooKeeper { executor.execute(() -> { - lock(); try { + lock(); if (stopped) { cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null); @@ -394,6 +394,9 @@ public class MockZooKeeper extends ZooKeeper { KeeperState.SyncConnected, parent))); } + } catch (Throwable ex) { + log.error("create path : {} error", path, ex); + cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null); } finally { unlockIfLocked(); } @@ -426,28 +429,33 @@ public class MockZooKeeper extends ZooKeeper { @Override public void getData(final String path, boolean watch, final DataCallback cb, final Object ctx) { executor.execute(() -> { - checkReadOpDelay(); - Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path); - if (failure.isPresent()) { - cb.processResult(failure.get().intValue(), path, ctx, null, null); - return; - } else if (stopped) { - cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null); - return; - } - - MockZNode value; - lock(); try { - value = tree.get(path); - } finally { - unlockIfLocked(); - } + checkReadOpDelay(); + Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path); + if (failure.isPresent()) { + cb.processResult(failure.get().intValue(), path, ctx, null, null); + return; + } else if (stopped) { + cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null, null); + return; + } - if (value == null) { - cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null); - } else { - cb.processResult(0, path, ctx, value.getContent(), createStatForZNode(value)); + MockZNode value; + lock(); + try { + value = tree.get(path); + } finally { + unlockIfLocked(); + } + + if (value == null) { + cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null, null); + } else { + cb.processResult(0, path, ctx, value.getContent(), createStatForZNode(value)); + } + } catch (Throwable ex) { + log.error("get data : {} error", path, ex); + cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null, null); } }); } @@ -456,8 +464,8 @@ public class MockZooKeeper extends ZooKeeper { public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object ctx) { executor.execute(() -> { checkReadOpDelay(); - lock(); try { + lock(); Optional<KeeperException.Code> failure = programmedFailure(Op.GET, path); if (failure.isPresent()) { unlockIfLocked(); @@ -482,6 +490,9 @@ public class MockZooKeeper extends ZooKeeper { unlockIfLocked(); cb.processResult(0, path, ctx, value.getContent(), stat); } + } catch (Throwable ex) { + log.error("get data : {} error", path, ex); + cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null, null); } finally { unlockIfLocked(); } @@ -491,9 +502,9 @@ public class MockZooKeeper extends ZooKeeper { @Override public void getChildren(final String path, final Watcher watcher, final ChildrenCallback cb, final Object ctx) { executor.execute(() -> { - lock(); List<String> children = Lists.newArrayList(); try { + lock(); Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path); if (failure.isPresent()) { unlockIfLocked(); @@ -529,11 +540,14 @@ public class MockZooKeeper extends ZooKeeper { if (watcher != null) { watchers.put(path, watcher); } + cb.processResult(0, path, ctx, children); + } catch (Throwable ex) { + log.error("get children : {} error", path, ex); + cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null); } finally { unlockIfLocked(); } - cb.processResult(0, path, ctx, children); }); } @@ -603,8 +617,8 @@ public class MockZooKeeper extends ZooKeeper { public void getChildren(final String path, boolean watcher, final Children2Callback cb, final Object ctx) { executor.execute(() -> { Set<String> children = new TreeSet<>(); - lock(); try { + lock(); Optional<KeeperException.Code> failure = programmedFailure(Op.GET_CHILDREN, path); if (failure.isPresent()) { unlockIfLocked(); @@ -630,10 +644,13 @@ public class MockZooKeeper extends ZooKeeper { String child = relativePath.split("/", 2)[0]; children.add(child); }); + cb.processResult(0, path, ctx, new ArrayList<>(children), new Stat()); + } catch (Throwable ex) { + log.error("get children : {} error", path, ex); + cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null, null); } finally { unlockIfLocked(); } - cb.processResult(0, path, ctx, new ArrayList<>(children), new Stat()); }); } @@ -702,8 +719,8 @@ public class MockZooKeeper extends ZooKeeper { @Override public void exists(String path, Watcher watcher, StatCallback cb, Object ctx) { executor.execute(() -> { - lock(); try { + lock(); Optional<KeeperException.Code> failure = programmedFailure(Op.EXISTS, path); if (failure.isPresent()) { unlockIfLocked(); @@ -726,6 +743,9 @@ public class MockZooKeeper extends ZooKeeper { unlockIfLocked(); cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null); } + } catch (Throwable ex) { + log.error("exist : {} error", path, ex); + cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null); } finally { unlockIfLocked(); } @@ -802,56 +822,60 @@ public class MockZooKeeper extends ZooKeeper { } executor.execute(() -> { - final Set<Watcher> toNotify = Sets.newHashSet(); - Stat stat; - lock(); try { + final Set<Watcher> toNotify = Sets.newHashSet(); + Stat stat; + lock(); + try { + Optional<KeeperException.Code> failure = programmedFailure(Op.SET, path); + if (failure.isPresent()) { + unlockIfLocked(); + cb.processResult(failure.get().intValue(), path, ctx, null); + return; + } else if (stopped) { + unlockIfLocked(); + cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null); + return; + } - Optional<KeeperException.Code> failure = programmedFailure(Op.SET, path); - if (failure.isPresent()) { - unlockIfLocked(); - cb.processResult(failure.get().intValue(), path, ctx, null); - return; - } else if (stopped) { - unlockIfLocked(); - cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx, null); - return; - } + if (!tree.containsKey(path)) { + unlockIfLocked(); + cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null); + return; + } - if (!tree.containsKey(path)) { - unlockIfLocked(); - cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null); - return; - } + MockZNode mockZNode = tree.get(path); + int currentVersion = mockZNode.getVersion(); - MockZNode mockZNode = tree.get(path); - int currentVersion = mockZNode.getVersion(); + // Check version + if (version != -1 && version != currentVersion) { + log.debug("[{}] Current version: {} -- Expected: {}", path, currentVersion, version); + unlockIfLocked(); + cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx, null); + return; + } - // Check version - if (version != -1 && version != currentVersion) { - log.debug("[{}] Current version: {} -- Expected: {}", path, currentVersion, version); + log.debug("[{}] Updating -- current version: {}", path, currentVersion); + MockZNode newZNode = MockZNode.of(data, currentVersion + 1, mockZNode.getEphemeralOwner()); + tree.put(path, newZNode); + stat = createStatForZNode(newZNode); + } finally { unlockIfLocked(); - cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx, null); - return; } + cb.processResult(0, path, ctx, stat); - log.debug("[{}] Updating -- current version: {}", path, currentVersion); - MockZNode newZNode = MockZNode.of(data, currentVersion + 1, mockZNode.getEphemeralOwner()); - tree.put(path, newZNode); - stat = createStatForZNode(newZNode); - } finally { - unlockIfLocked(); - } - cb.processResult(0, path, ctx, stat); + toNotify.addAll(watchers.get(path)); + watchers.removeAll(path); - toNotify.addAll(watchers.get(path)); - watchers.removeAll(path); + for (Watcher watcher : toNotify) { + watcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path)); + } - for (Watcher watcher : toNotify) { - watcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path)); + triggerPersistentWatches(path, null, EventType.NodeDataChanged); + } catch (Throwable ex) { + log.error("Update data : {} error", path, ex); + cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx, null); } - - triggerPersistentWatches(path, null, EventType.NodeDataChanged); }); } @@ -915,8 +939,8 @@ public class MockZooKeeper extends ZooKeeper { @Override public void delete(final String path, int version, final VoidCallback cb, final Object ctx) { Runnable r = () -> { - lock(); try { + lock(); final Set<Watcher> toNotifyDelete = Sets.newHashSet(); toNotifyDelete.addAll(watchers.get(path)); @@ -962,6 +986,9 @@ public class MockZooKeeper extends ZooKeeper { parent))); triggerPersistentWatches(path, parent, EventType.NodeDeleted); } + } catch (Throwable ex) { + log.error("delete path : {} error", path, ex); + cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(), path, ctx); } finally { unlockIfLocked(); }
