Repository: curator Updated Branches: refs/heads/CURATOR-33 f4743336e -> 267492779
Fix potential race condition. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/26749277 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/26749277 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/26749277 Branch: refs/heads/CURATOR-33 Commit: 267492779ac127d22c67791c799b73d46dfeac7a Parents: f474333 Author: Scott Blum <[email protected]> Authored: Fri Aug 1 15:23:16 2014 -0400 Committer: Scott Blum <[email protected]> Committed: Fri Aug 1 15:28:15 2014 -0400 ---------------------------------------------------------------------- .../framework/recipes/cache/TreeCache.java | 58 +++++++++++++------- 1 file changed, 39 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/26749277/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java index f73861d..0d5995a 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/cache/TreeCache.java @@ -71,14 +71,14 @@ public class TreeCache implements Closeable private final class TreeNode implements Watcher, BackgroundCallback { - private final AtomicReference<NodeState> nodeState = new AtomicReference<NodeState>(NodeState.PENDING); - private final String path; - private final TreeNode parent; - private final AtomicReference<Stat> stat = new AtomicReference<Stat>(); - private final AtomicReference<byte[]> data = new AtomicReference<byte[]>(); - private final AtomicReference<ConcurrentMap<String, TreeNode>> children = new AtomicReference<ConcurrentMap<String, TreeNode>>(); - - private TreeNode(String path, TreeNode parent) + final AtomicReference<NodeState> nodeState = new AtomicReference<NodeState>(NodeState.PENDING); + final TreeNode parent; + final String path; + final AtomicReference<Stat> stat = new AtomicReference<Stat>(); + final AtomicReference<byte[]> data = new AtomicReference<byte[]>(); + final AtomicReference<ConcurrentMap<String, TreeNode>> children = new AtomicReference<ConcurrentMap<String, TreeNode>>(); + + TreeNode(String path, TreeNode parent) { this.path = path; this.parent = parent; @@ -86,19 +86,30 @@ public class TreeCache implements Closeable private void refresh() throws Exception { - refreshData(); - refreshChildren(); + outstandingOps.addAndGet(2); + doRefreshData(); + doRefreshChildren(); } private void refreshChildren() throws Exception { outstandingOps.incrementAndGet(); - client.getChildren().usingWatcher(this).inBackground(this).forPath(path); + doRefreshChildren(); } private void refreshData() throws Exception { outstandingOps.incrementAndGet(); + doRefreshData(); + } + + private void doRefreshChildren() throws Exception + { + client.getChildren().usingWatcher(this).inBackground(this).forPath(path); + } + + private void doRefreshData() throws Exception + { if ( dataIsCompressed ) { client.getData().decompressed().usingWatcher(this).inBackground(this).forPath(path); @@ -109,7 +120,7 @@ public class TreeCache implements Closeable } } - private void wasReconnected() throws Exception + void wasReconnected() throws Exception { refresh(); ConcurrentMap<String, TreeNode> childMap = children.get(); @@ -122,12 +133,12 @@ public class TreeCache implements Closeable } } - private void wasCreated() throws Exception + void wasCreated() throws Exception { refresh(); } - private void wasDeleted() throws Exception + void wasDeleted() throws Exception { stat.set(null); data.set(null); @@ -200,6 +211,7 @@ public class TreeCache implements Closeable @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { + Stat newStat = event.getStat(); switch ( event.getType() ) { case EXISTS: @@ -217,7 +229,12 @@ public class TreeCache implements Closeable case CHILDREN: if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { - stat.set(event.getStat()); + Stat oldStat = stat.get(); + if (oldStat != null && oldStat.getMzxid() == newStat.getMzxid()) { + // Only update stat if mzxid is different, otherwise we might obscure + // GET_DATA event updates. + stat.set(newStat); + } if ( event.getChildren().isEmpty() ) { @@ -263,19 +280,22 @@ public class TreeCache implements Closeable case GET_DATA: if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { - Stat oldStat = stat.getAndSet(event.getStat()); if ( cacheData ) { data.set(event.getData()); } + Stat oldStat = stat.getAndSet(newStat); if ( nodeState.compareAndSet(NodeState.PENDING, NodeState.LIVE) ) { - publishEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData(event.getPath(), event.getStat(), event.getData())); + publishEvent(TreeCacheEvent.Type.NODE_ADDED, new ChildData(event.getPath(), newStat, event.getData())); } - else if ( oldStat.getMzxid() != event.getStat().getMzxid() ) + else { - publishEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData(event.getPath(), event.getStat(), event.getData())); + if ( oldStat == null || oldStat.getMzxid() != newStat.getMzxid() ) + { + publishEvent(TreeCacheEvent.Type.NODE_UPDATED, new ChildData(event.getPath(), newStat, event.getData())); + } } } else if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
