Repository: curator Updated Branches: refs/heads/CURATOR-161 ba4da2c3c -> a47c03671
CURATOR-161 - Fixed up some potential race conditions with unit tests. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/a47c0367 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/a47c0367 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/a47c0367 Branch: refs/heads/CURATOR-161 Commit: a47c03671452a20e70deb2b8668d70bd92d78ce7 Parents: ba4da2c Author: Cameron McKenzie <[email protected]> Authored: Thu May 14 09:33:22 2015 +1000 Committer: Cameron McKenzie <[email protected]> Committed: Thu May 14 09:33:22 2015 +1000 ---------------------------------------------------------------------- .../framework/imps/TestRemoveWatches.java | 55 ++++++++++++++++---- 1 file changed, 46 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/a47c0367/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java index fc15f0c..49f34ea 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestRemoveWatches.java @@ -4,6 +4,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -30,23 +31,51 @@ import org.testng.annotations.Test; public class TestRemoveWatches extends BaseClassForTests { - private boolean blockUntilDesiredConnectionState(CuratorFramework client, Timing timing, final ConnectionState desiredState) + private AtomicReference<ConnectionState> registerConnectionStateListener(CuratorFramework client) { - final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference<ConnectionState> state = new AtomicReference<ConnectionState>(); client.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { - if(newState == desiredState) + state.set(newState); + synchronized(state) { - latch.countDown(); + state.notify(); } } }); - return timing.awaitLatch(latch); + return state; + } + + private boolean blockUntilDesiredConnectionState(AtomicReference<ConnectionState> stateRef, Timing timing, final ConnectionState desiredState) + { + if(stateRef.get() == desiredState) + { + return true; + } + + synchronized(stateRef) + { + if(stateRef.get() == desiredState) + { + return true; + } + + try + { + stateRef.wait(timing.milliseconds()); + return stateRef.get() == desiredState; + } + catch(InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + } } @Test @@ -337,6 +366,8 @@ public class TestRemoveWatches extends BaseClassForTests { client.start(); + AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client); + final String path = "/"; final CountDownLatch removedLatch = new CountDownLatch(1); @@ -348,7 +379,7 @@ public class TestRemoveWatches extends BaseClassForTests //Stop the server so we can check if we can remove watches locally when offline server.stop(); - blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED); + Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED)); client.watches().removeAll().locally().forPath(path); @@ -371,6 +402,8 @@ public class TestRemoveWatches extends BaseClassForTests { client.start(); + AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client); + final String path = "/"; final CountDownLatch removedLatch = new CountDownLatch(1); @@ -382,7 +415,7 @@ public class TestRemoveWatches extends BaseClassForTests //Stop the server so we can check if we can remove watches locally when offline server.stop(); - blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED); + Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED)); client.watches().removeAll().locally().inBackground().forPath(path); @@ -470,6 +503,8 @@ public class TestRemoveWatches extends BaseClassForTests try { client.start(); + + AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client); String path = "/"; @@ -480,7 +515,7 @@ public class TestRemoveWatches extends BaseClassForTests server.stop(); - blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED); + Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED)); //Remove the watch while we're not connected try @@ -511,6 +546,8 @@ public class TestRemoveWatches extends BaseClassForTests try { client.start(); + + AtomicReference<ConnectionState> stateRef = registerConnectionStateListener(client); final CountDownLatch guaranteeAddedLatch = new CountDownLatch(1); @@ -533,7 +570,7 @@ public class TestRemoveWatches extends BaseClassForTests client.checkExists().usingWatcher(watcher).forPath(path); server.stop(); - blockUntilDesiredConnectionState(client, timing, ConnectionState.SUSPENDED); + Assert.assertTrue(blockUntilDesiredConnectionState(stateRef, timing, ConnectionState.SUSPENDED)); //Remove the watch while we're not connected client.watches().remove(watcher).guaranteed().inBackground().forPath(path);
