This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4df2593a626 [fix][test] Catch exception when update data in
mockZookeeper (#16473)
4df2593a626 is described below
commit 4df2593a62606d3fcfd5ebab8923870814832569
Author: Qiang Zhao <[email protected]>
AuthorDate: Tue Jul 12 22:39:38 2022 +0800
[fix][test] Catch exception when update data in mockZookeeper (#16473)
---
.../java/org/apache/zookeeper/MockZooKeeper.java | 159 ++++++++++++---------
1 file changed, 90 insertions(+), 69 deletions(-)
diff --git a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
index 4e7501c9676..b9325c16102 100644
--- a/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
+++ b/testmocks/src/main/java/org/apache/zookeeper/MockZooKeeper.java
@@ -334,8 +334,8 @@ public class MockZooKeeper extends ZooKeeper {
executor.execute(() -> {
- lock();
try {
+ lock();
if (stopped) {
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx,
null);
@@ -394,7 +394,7 @@ public class MockZooKeeper extends ZooKeeper {
KeeperState.SyncConnected,
parent)));
}
- } catch (Exception ex) {
+ } catch (Throwable ex) {
log.error("create path : {} error", path, ex);
cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(),
path, ctx, null);
} finally {
@@ -429,28 +429,33 @@ public class MockZooKeeper extends ZooKeeper {
@Override
public void getData(final String path, boolean watch, final DataCallback
cb, final Object ctx) {
executor.execute(() -> {
- checkReadOpDelay();
- Optional<KeeperException.Code> failure = programmedFailure(Op.GET,
path);
- if (failure.isPresent()) {
- cb.processResult(failure.get().intValue(), path, ctx, null,
null);
- return;
- } else if (stopped) {
-
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx,
null, null);
- return;
- }
-
- MockZNode value;
- lock();
try {
- value = tree.get(path);
- } finally {
- unlockIfLocked();
- }
+ checkReadOpDelay();
+ Optional<KeeperException.Code> failure =
programmedFailure(Op.GET, path);
+ if (failure.isPresent()) {
+ cb.processResult(failure.get().intValue(), path, ctx,
null, null);
+ return;
+ } else if (stopped) {
+
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx,
null, null);
+ return;
+ }
- if (value == null) {
- cb.processResult(KeeperException.Code.NONODE.intValue(), path,
ctx, null, null);
- } else {
- cb.processResult(0, path, ctx, value.getContent(),
createStatForZNode(value));
+ MockZNode value;
+ lock();
+ try {
+ value = tree.get(path);
+ } finally {
+ unlockIfLocked();
+ }
+
+ if (value == null) {
+ cb.processResult(KeeperException.Code.NONODE.intValue(),
path, ctx, null, null);
+ } else {
+ cb.processResult(0, path, ctx, value.getContent(),
createStatForZNode(value));
+ }
+ } catch (Throwable ex) {
+ log.error("get data : {} error", path, ex);
+ cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(),
path, ctx, null, null);
}
});
}
@@ -459,8 +464,8 @@ public class MockZooKeeper extends ZooKeeper {
public void getData(final String path, final Watcher watcher, final
DataCallback cb, final Object ctx) {
executor.execute(() -> {
checkReadOpDelay();
- lock();
try {
+ lock();
Optional<KeeperException.Code> failure =
programmedFailure(Op.GET, path);
if (failure.isPresent()) {
unlockIfLocked();
@@ -485,6 +490,9 @@ public class MockZooKeeper extends ZooKeeper {
unlockIfLocked();
cb.processResult(0, path, ctx, value.getContent(), stat);
}
+ } catch (Throwable ex) {
+ log.error("get data : {} error", path, ex);
+ cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(),
path, ctx, null, null);
} finally {
unlockIfLocked();
}
@@ -494,9 +502,9 @@ public class MockZooKeeper extends ZooKeeper {
@Override
public void getChildren(final String path, final Watcher watcher, final
ChildrenCallback cb, final Object ctx) {
executor.execute(() -> {
- lock();
List<String> children = Lists.newArrayList();
try {
+ lock();
Optional<KeeperException.Code> failure =
programmedFailure(Op.GET_CHILDREN, path);
if (failure.isPresent()) {
unlockIfLocked();
@@ -532,11 +540,14 @@ public class MockZooKeeper extends ZooKeeper {
if (watcher != null) {
watchers.put(path, watcher);
}
+ cb.processResult(0, path, ctx, children);
+ } catch (Throwable ex) {
+ log.error("get children : {} error", path, ex);
+ cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(),
path, ctx, null);
} finally {
unlockIfLocked();
}
- cb.processResult(0, path, ctx, children);
});
}
@@ -606,8 +617,8 @@ public class MockZooKeeper extends ZooKeeper {
public void getChildren(final String path, boolean watcher, final
Children2Callback cb, final Object ctx) {
executor.execute(() -> {
Set<String> children = new TreeSet<>();
- lock();
try {
+ lock();
Optional<KeeperException.Code> failure =
programmedFailure(Op.GET_CHILDREN, path);
if (failure.isPresent()) {
unlockIfLocked();
@@ -633,10 +644,13 @@ public class MockZooKeeper extends ZooKeeper {
String child = relativePath.split("/", 2)[0];
children.add(child);
});
+ cb.processResult(0, path, ctx, new ArrayList<>(children), new
Stat());
+ } catch (Throwable ex) {
+ log.error("get children : {} error", path, ex);
+ cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(),
path, ctx, null, null);
} finally {
unlockIfLocked();
}
- cb.processResult(0, path, ctx, new ArrayList<>(children), new
Stat());
});
}
@@ -705,8 +719,8 @@ public class MockZooKeeper extends ZooKeeper {
@Override
public void exists(String path, Watcher watcher, StatCallback cb, Object
ctx) {
executor.execute(() -> {
- lock();
try {
+ lock();
Optional<KeeperException.Code> failure =
programmedFailure(Op.EXISTS, path);
if (failure.isPresent()) {
unlockIfLocked();
@@ -729,6 +743,9 @@ public class MockZooKeeper extends ZooKeeper {
unlockIfLocked();
cb.processResult(KeeperException.Code.NONODE.intValue(),
path, ctx, null);
}
+ } catch (Throwable ex) {
+ log.error("exist : {} error", path, ex);
+ cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(),
path, ctx, null);
} finally {
unlockIfLocked();
}
@@ -805,56 +822,60 @@ public class MockZooKeeper extends ZooKeeper {
}
executor.execute(() -> {
- final Set<Watcher> toNotify = Sets.newHashSet();
- Stat stat;
- lock();
try {
+ final Set<Watcher> toNotify = Sets.newHashSet();
+ Stat stat;
+ lock();
+ try {
+ Optional<KeeperException.Code> failure =
programmedFailure(Op.SET, path);
+ if (failure.isPresent()) {
+ unlockIfLocked();
+ cb.processResult(failure.get().intValue(), path, ctx,
null);
+ return;
+ } else if (stopped) {
+ unlockIfLocked();
+
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx,
null);
+ return;
+ }
- Optional<KeeperException.Code> failure =
programmedFailure(Op.SET, path);
- if (failure.isPresent()) {
- unlockIfLocked();
- cb.processResult(failure.get().intValue(), path, ctx,
null);
- return;
- } else if (stopped) {
- unlockIfLocked();
-
cb.processResult(KeeperException.Code.CONNECTIONLOSS.intValue(), path, ctx,
null);
- return;
- }
+ if (!tree.containsKey(path)) {
+ unlockIfLocked();
+
cb.processResult(KeeperException.Code.NONODE.intValue(), path, ctx, null);
+ return;
+ }
- if (!tree.containsKey(path)) {
- unlockIfLocked();
- cb.processResult(KeeperException.Code.NONODE.intValue(),
path, ctx, null);
- return;
- }
+ MockZNode mockZNode = tree.get(path);
+ int currentVersion = mockZNode.getVersion();
- MockZNode mockZNode = tree.get(path);
- int currentVersion = mockZNode.getVersion();
+ // Check version
+ if (version != -1 && version != currentVersion) {
+ log.debug("[{}] Current version: {} -- Expected: {}",
path, currentVersion, version);
+ unlockIfLocked();
+
cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx, null);
+ return;
+ }
- // Check version
- if (version != -1 && version != currentVersion) {
- log.debug("[{}] Current version: {} -- Expected: {}",
path, currentVersion, version);
+ log.debug("[{}] Updating -- current version: {}", path,
currentVersion);
+ MockZNode newZNode = MockZNode.of(data, currentVersion +
1, mockZNode.getEphemeralOwner());
+ tree.put(path, newZNode);
+ stat = createStatForZNode(newZNode);
+ } finally {
unlockIfLocked();
-
cb.processResult(KeeperException.Code.BADVERSION.intValue(), path, ctx, null);
- return;
}
+ cb.processResult(0, path, ctx, stat);
- log.debug("[{}] Updating -- current version: {}", path,
currentVersion);
- MockZNode newZNode = MockZNode.of(data, currentVersion + 1,
mockZNode.getEphemeralOwner());
- tree.put(path, newZNode);
- stat = createStatForZNode(newZNode);
- } finally {
- unlockIfLocked();
- }
- cb.processResult(0, path, ctx, stat);
+ toNotify.addAll(watchers.get(path));
+ watchers.removeAll(path);
- toNotify.addAll(watchers.get(path));
- watchers.removeAll(path);
+ for (Watcher watcher : toNotify) {
+ watcher.process(new
WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path));
+ }
- for (Watcher watcher : toNotify) {
- watcher.process(new WatchedEvent(EventType.NodeDataChanged,
KeeperState.SyncConnected, path));
+ triggerPersistentWatches(path, null,
EventType.NodeDataChanged);
+ } catch (Throwable ex) {
+ log.error("Update data : {} error", path, ex);
+ cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(),
path, ctx, null);
}
-
- triggerPersistentWatches(path, null, EventType.NodeDataChanged);
});
}
@@ -918,8 +939,8 @@ public class MockZooKeeper extends ZooKeeper {
@Override
public void delete(final String path, int version, final VoidCallback cb,
final Object ctx) {
Runnable r = () -> {
- lock();
try {
+ lock();
final Set<Watcher> toNotifyDelete = Sets.newHashSet();
toNotifyDelete.addAll(watchers.get(path));
@@ -965,7 +986,7 @@ public class MockZooKeeper extends ZooKeeper {
parent)));
triggerPersistentWatches(path, parent,
EventType.NodeDeleted);
}
- } catch (Exception ex) {
+ } catch (Throwable ex) {
log.error("delete path : {} error", path, ex);
cb.processResult(KeeperException.Code.SYSTEMERROR.intValue(),
path, ctx);
} finally {