Repository: curator Updated Branches: refs/heads/CURATOR-397 6485f1650 -> 396d98a51
More tests, refactoring Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/396d98a5 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/396d98a5 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/396d98a5 Branch: refs/heads/CURATOR-397 Commit: 396d98a51495ec1c60156dcd0d6d553644e43689 Parents: 6485f16 Author: randgalt <[email protected]> Authored: Wed May 10 23:38:07 2017 +0200 Committer: randgalt <[email protected]> Committed: Wed May 10 23:38:07 2017 +0200 ---------------------------------------------------------------------- .../modeled/cached/ModeledCacheListener.java | 7 +- .../modeled/TestCachedModeledFramework.java | 122 +++++++++++++++---- .../x/async/modeled/TestModeledFramework.java | 37 +----- .../async/modeled/TestModeledFrameworkBase.java | 64 ++++++++++ 4 files changed, 167 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/396d98a5/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java index 4f1ac70..42498c0 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/cached/ModeledCacheListener.java @@ -40,12 +40,7 @@ public interface ModeledCacheListener<T> /** * A child was removed from the path */ - NODE_REMOVED, - - /** - * Signals that the initial cache has been populated. - */ - INITIALIZED + NODE_REMOVED } /** http://git-wip-us.apache.org/repos/asf/curator/blob/396d98a5/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java index 7be7c28..a9048de 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestCachedModeledFramework.java @@ -18,18 +18,22 @@ */ package org.apache.curator.x.async.modeled; +import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.test.Timing; import org.apache.curator.x.async.modeled.cached.CachedModeledFramework; +import org.apache.curator.x.async.modeled.cached.ModeledCacheListener; import org.apache.curator.x.async.modeled.models.TestModel; import org.testng.Assert; import org.testng.annotations.Test; +import java.io.IOException; import java.math.BigInteger; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicReference; -public class TestCachedModeledFramework extends TestModeledFramework +public class TestCachedModeledFramework extends TestModeledFrameworkBase { @Test public void testThreading() @@ -42,18 +46,25 @@ public class TestCachedModeledFramework extends TestModeledFramework complete(client.set(model)); client.start(); - Assert.assertTrue(new Timing().awaitLatch(latch)); - - AtomicReference<Thread> completionThread = new AtomicReference<>(); - complete(client.read().whenComplete((s, e) -> completionThread.set((e == null) ? Thread.currentThread() : null))); - Assert.assertNotNull(completionThread.get()); - Assert.assertNotEquals(Thread.currentThread(), completionThread.get(), "Should be different threads"); - completionThread.set(null); - - complete(client.at("foo").read().whenComplete((v, e) -> completionThread.set((e != null) ? Thread.currentThread() : null))); - Assert.assertNotNull(completionThread.get()); - Assert.assertNotEquals(Thread.currentThread(), completionThread.get(), "Should be different threads"); - completionThread.set(null); + try + { + Assert.assertTrue(new Timing().awaitLatch(latch)); + + AtomicReference<Thread> completionThread = new AtomicReference<>(); + complete(client.read().whenComplete((s, e) -> completionThread.set((e == null) ? Thread.currentThread() : null))); + Assert.assertNotNull(completionThread.get()); + Assert.assertNotEquals(Thread.currentThread(), completionThread.get(), "Should be different threads"); + completionThread.set(null); + + complete(client.at("foo").read().whenComplete((v, e) -> completionThread.set((e != null) ? Thread.currentThread() : null))); + Assert.assertNotNull(completionThread.get()); + Assert.assertNotEquals(Thread.currentThread(), completionThread.get(), "Should be different threads"); + completionThread.set(null); + } + finally + { + client.close(); + } } @Test @@ -73,15 +84,84 @@ public class TestCachedModeledFramework extends TestModeledFramework complete(client.set(model)); client.start(); - Assert.assertTrue(new Timing().awaitLatch(latch)); + try + { + Assert.assertTrue(new Timing().awaitLatch(latch)); + + AtomicReference<Thread> completionThread = new AtomicReference<>(); + complete(client.read().thenAccept(s -> completionThread.set(Thread.currentThread()))); + Assert.assertEquals(ourThread.get(), completionThread.get(), "Should be our thread"); + completionThread.set(null); + + complete(client.at("foo").read().whenComplete((v, e) -> completionThread.set((e != null) ? Thread.currentThread() : null))); + Assert.assertEquals(ourThread.get(), completionThread.get(), "Should be our thread"); + completionThread.set(null); + } + finally + { + client.close(); + } + } + + @Test + public void testDownServer() throws IOException + { + Timing timing = new Timing(); + + TestModel model = new TestModel("a", "b", "c", 1, BigInteger.ONE); + CachedModeledFramework<TestModel> client = ModeledFramework.wrap(async, modelSpec).cached(); + Semaphore semaphore = new Semaphore(0); + client.listenable().addListener((t, p, s, m) -> semaphore.release()); - AtomicReference<Thread> completionThread = new AtomicReference<>(); - complete(client.read().whenComplete((s, e) -> completionThread.set((e == null) ? Thread.currentThread() : null))); - Assert.assertEquals(ourThread.get(), completionThread.get(), "Should be our thread"); - completionThread.set(null); + client.start(); + try + { + client.set(model); + Assert.assertTrue(timing.acquireSemaphore(semaphore)); + + CountDownLatch latch = new CountDownLatch(1); + rawClient.getConnectionStateListenable().addListener((__, state) -> { + if ( state == ConnectionState.LOST ) + { + latch.countDown(); + } + }); + server.stop(); + Assert.assertTrue(timing.awaitLatch(latch)); + + complete(client.read().whenComplete((value, e) -> { + Assert.assertNotNull(value); + Assert.assertNull(e); + })); + } + finally + { + client.close(); + } + } + + @Test + public void testPostInitializedFilter() + { + TestModel model1 = new TestModel("a", "b", "c", 1, BigInteger.ONE); + TestModel model2 = new TestModel("d", "e", "e", 1, BigInteger.ONE); + CachedModeledFramework<TestModel> client = ModeledFramework.wrap(async, modelSpec).cached(); + Semaphore semaphore = new Semaphore(0); + ModeledCacheListener<TestModel> listener = (t, p, s, m) -> semaphore.release(); + client.listenable().addListener(listener.postInitializedOnly()); + + complete(client.at("1").set(model1)); // set before cache is started + client.start(); + try + { + Assert.assertFalse(timing.forSleepingABit().acquireSemaphore(semaphore)); - complete(client.at("foo").read().whenComplete((v, e) -> completionThread.set((e != null) ? Thread.currentThread() : null))); - Assert.assertEquals(ourThread.get(), completionThread.get(), "Should be our thread"); - completionThread.set(null); + client.at("2").set(model2); // set before cache is started + Assert.assertTrue(timing.acquireSemaphore(semaphore)); + } + finally + { + client.close(); + } } } http://git-wip-us.apache.org/repos/asf/curator/blob/396d98a5/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java index 98d5ee1..4a08a2b 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java @@ -25,57 +25,22 @@ import org.apache.curator.framework.schema.Schema; import org.apache.curator.framework.schema.SchemaSet; import org.apache.curator.framework.schema.SchemaViolation; import org.apache.curator.retry.RetryOneTime; -import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.AsyncStage; -import org.apache.curator.x.async.CompletableBaseClassForTests; import org.apache.curator.x.async.modeled.models.TestModel; import org.apache.curator.x.async.modeled.models.TestNewerModel; import org.apache.curator.x.async.modeled.versioned.Versioned; import org.apache.curator.x.async.modeled.versioned.VersionedModeledFramework; import org.apache.zookeeper.KeeperException; import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.math.BigInteger; import java.util.Collections; import java.util.Set; import java.util.concurrent.CountDownLatch; -public class TestModeledFramework extends CompletableBaseClassForTests +public class TestModeledFramework extends TestModeledFrameworkBase { - protected static final ZPath path = ZPath.parse("/test/path"); - protected CuratorFramework rawClient; - protected ModelSpec<TestModel> modelSpec; - protected ModelSpec<TestNewerModel> newModelSpec; - protected AsyncCuratorFramework async; - - @BeforeMethod - @Override - public void setup() throws Exception - { - super.setup(); - - rawClient = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); - rawClient.start(); - async = AsyncCuratorFramework.wrap(rawClient); - - JacksonModelSerializer<TestModel> serializer = JacksonModelSerializer.build(TestModel.class); - JacksonModelSerializer<TestNewerModel> newSerializer = JacksonModelSerializer.build(TestNewerModel.class); - - modelSpec = ModelSpec.builder(path, serializer).build(); - newModelSpec = ModelSpec.builder(path, newSerializer).build(); - } - - @AfterMethod - @Override - public void teardown() throws Exception - { - CloseableUtils.closeQuietly(rawClient); - super.teardown(); - } - @Test public void testCrud() { http://git-wip-us.apache.org/repos/asf/curator/blob/396d98a5/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java new file mode 100644 index 0000000..61a4570 --- /dev/null +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.async.AsyncCuratorFramework; +import org.apache.curator.x.async.CompletableBaseClassForTests; +import org.apache.curator.x.async.modeled.models.TestModel; +import org.apache.curator.x.async.modeled.models.TestNewerModel; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; + +public class TestModeledFrameworkBase extends CompletableBaseClassForTests +{ + protected static final ZPath path = ZPath.parse("/test/path"); + protected CuratorFramework rawClient; + protected ModelSpec<TestModel> modelSpec; + protected ModelSpec<TestNewerModel> newModelSpec; + protected AsyncCuratorFramework async; + + @BeforeMethod + @Override + public void setup() throws Exception + { + super.setup(); + + rawClient = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + rawClient.start(); + async = AsyncCuratorFramework.wrap(rawClient); + + JacksonModelSerializer<TestModel> serializer = JacksonModelSerializer.build(TestModel.class); + JacksonModelSerializer<TestNewerModel> newSerializer = JacksonModelSerializer.build(TestNewerModel.class); + + modelSpec = ModelSpec.builder(path, serializer).build(); + newModelSpec = ModelSpec.builder(path, newSerializer).build(); + } + + @AfterMethod + @Override + public void teardown() throws Exception + { + CloseableUtils.closeQuietly(rawClient); + super.teardown(); + } +}
