more tests
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9385d049 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9385d049 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9385d049 Branch: refs/heads/master Commit: 9385d0490d8c684a602b4c57489e39941a9a178b Parents: 75118e4 Author: randgalt <[email protected]> Authored: Fri Jul 14 17:34:00 2017 -0500 Committer: randgalt <[email protected]> Committed: Fri Jul 14 17:34:00 2017 -0500 ---------------------------------------------------------------------- .../async/migrations/TestMigrationManager.java | 88 +++++++++++++++++++- 1 file changed, 87 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/9385d049/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java index 80a03bb..786e704 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java @@ -25,6 +25,7 @@ import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.async.AsyncCuratorFramework; +import org.apache.curator.x.async.AsyncWrappers; import org.apache.curator.x.async.CompletableBaseClassForTests; import org.apache.curator.x.async.migrations.models.ModelV1; import org.apache.curator.x.async.migrations.models.ModelV2; @@ -41,12 +42,20 @@ import org.testng.annotations.Test; import java.time.Duration; import java.util.Arrays; import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; public class TestMigrationManager extends CompletableBaseClassForTests { + private static final String LOCK_PATH = "/migrations/locks"; + private static final String META_DATA_PATH = "/migrations/metadata"; private AsyncCuratorFramework client; private ModelSpec<ModelV1> v1Spec; private ModelSpec<ModelV2> v2Spec; @@ -57,6 +66,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests private CuratorOp v2op; private CuratorOp v3op; private MigrationManager manager; + private final AtomicReference<CountDownLatch> filterLatch = new AtomicReference<>(); @BeforeMethod @Override @@ -81,7 +91,27 @@ public class TestMigrationManager extends CompletableBaseClassForTests v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30)); executor = Executors.newCachedThreadPool(); - manager = new MigrationManager(client, "/migrations/locks", "/migrations/metadata", executor, Duration.ofMinutes(10)); + manager = new MigrationManager(client, LOCK_PATH, META_DATA_PATH, executor, Duration.ofMinutes(10)) + { + @Override + protected List<Migration> filter(MigrationSet set, List<byte[]> operationHashesInOrder) throws MigrationException + { + CountDownLatch localLatch = filterLatch.getAndSet(null); + if ( localLatch != null ) + { + try + { + localLatch.await(); + } + catch ( InterruptedException e ) + { + Thread.currentThread().interrupt(); + Throwables.propagate(e); + } + } + return super.filter(set, operationHashesInOrder); + } + }; manager.debugCount = new AtomicInteger(); } @@ -260,4 +290,60 @@ public class TestMigrationManager extends CompletableBaseClassForTests Assert.assertNull(client.unwrap().checkExists().forPath("/test")); } + + @Test + public void testConcurrency1() throws Exception + { + CuratorOp op1 = client.transactionOp().create().forPath("/test"); + CuratorOp op2 = client.transactionOp().create().forPath("/test/bar", "first".getBytes()); + Migration migration = () -> Arrays.asList(op1, op2); + MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(migration)); + CountDownLatch latch = new CountDownLatch(1); + filterLatch.set(latch); + CompletionStage<Void> first = manager.migrate(migrationSet); + + MigrationManager manager2 = new MigrationManager(client, LOCK_PATH, META_DATA_PATH, executor, Duration.ofMillis(timing.forSleepingABit().milliseconds())); + try + { + complete(manager2.migrate(migrationSet)); + Assert.fail("Should throw"); + } + catch ( Throwable e ) + { + Assert.assertTrue(Throwables.getRootCause(e) instanceof AsyncWrappers.TimeoutException); + } + + latch.countDown(); + complete(first); + Assert.assertEquals(client.unwrap().getData().forPath("/test/bar"), "first".getBytes()); + } + + @Test + public void testConcurrency2() throws Exception + { + CuratorOp op1 = client.transactionOp().create().forPath("/test"); + CuratorOp op2 = client.transactionOp().create().forPath("/test/bar", "first".getBytes()); + Migration migration = () -> Arrays.asList(op1, op2); + MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(migration)); + CountDownLatch latch = new CountDownLatch(1); + filterLatch.set(latch); + CompletionStage<Void> first = manager.migrate(migrationSet); + + CompletionStage<Void> second = manager.migrate(migrationSet); + try + { + second.toCompletableFuture().get(timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS); + Assert.fail("Should throw"); + } + catch ( Throwable e ) + { + Assert.assertTrue(Throwables.getRootCause(e) instanceof TimeoutException); + } + + latch.countDown(); + complete(first); + Assert.assertEquals(client.unwrap().getData().forPath("/test/bar"), "first".getBytes()); + complete(second); + Assert.assertEquals(manager.debugCount.get(), 1); + } }
