Repository: incubator-tephra Updated Branches: refs/heads/master 307a585cb -> 67eaa768c
Fix flaky test (PooledClientProviderTest). Previously, the entire client pool was not exhausted. This closes #8 Signed-off-by: poorna <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/67eaa768 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/67eaa768 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/67eaa768 Branch: refs/heads/master Commit: 67eaa768c9e9ff117315613eabbcf8219f25cfde Parents: 307a585 Author: Ali Anwar <[email protected]> Authored: Thu Sep 8 23:34:44 2016 -0700 Committer: poorna <[email protected]> Committed: Fri Sep 9 00:55:16 2016 -0700 ---------------------------------------------------------------------- .../distributed/PooledClientProviderTest.java | 44 ++++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/67eaa768/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java b/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java index 507cefb..dce8078 100644 --- a/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java +++ b/tephra-core/src/test/java/org/apache/tephra/distributed/PooledClientProviderTest.java @@ -135,30 +135,31 @@ public class PooledClientProviderTest { } }); - //Now race to get MAX_CLIENT_COUNT+1 clients, exhausting the pool and requesting 1 more. - List<Future<Integer>> clientIds = new ArrayList<Future<Integer>>(); - CountDownLatch countDownLatch = new CountDownLatch(1); + List<Future<Integer>> clientIds = new ArrayList<>(); + // We want to ensure that all clients have been exhausted before releasing any. + // Only once all the clients are fetched from the pool, will any be released. The last thread will reuse one of + // these clients from the pool. + CountDownLatch clientDoneLatch = new CountDownLatch(MAX_CLIENT_COUNT); ExecutorService executor = Executors.newFixedThreadPool(MAX_CLIENT_COUNT + 1); for (int i = 0; i < MAX_CLIENT_COUNT + 1; i++) { - clientIds.add(executor.submit(new RetrieveClient(clientProvider, CLIENT_OBTAIN_TIMEOUT / 2, countDownLatch))); + clientIds.add(executor.submit(new RetrieveClient(clientProvider, clientDoneLatch))); } - countDownLatch.countDown(); - Set<Integer> ids = new HashSet<Integer>(); + Set<Integer> ids = new HashSet<>(); for (Future<Integer> id : clientIds) { ids.add(id.get()); } Assert.assertEquals(MAX_CLIENT_COUNT, ids.size()); - // now, try it again with, where each thread holds onto the client for twice the client.obtain.timeout value. - // one of the threads should throw a TimeOutException, because the other threads don't release their clients - // within the configured timeout. - countDownLatch = new CountDownLatch(1); + // Now, try it again with, with a countdown latch equal to the number of threads. All of them will only progress + // past it, once they all acquire a client or time out while attempting to obtain one. + // One of the threads should throw a TimeOutException, because the other threads don't release their clients + // until then and the client thread pool isn't enough for the number of threads. + clientDoneLatch = new CountDownLatch(MAX_CLIENT_COUNT + 1); for (int i = 0; i < MAX_CLIENT_COUNT + 1; i++) { - clientIds.add(executor.submit(new RetrieveClient(clientProvider, CLIENT_OBTAIN_TIMEOUT * 2, countDownLatch))); + clientIds.add(executor.submit(new RetrieveClient(clientProvider, clientDoneLatch))); } - countDownLatch.countDown(); int numTimeoutExceptions = 0; for (Future<Integer> clientId : clientIds) { try { @@ -173,7 +174,7 @@ public class PooledClientProviderTest { CLIENT_OBTAIN_TIMEOUT), 1, numTimeoutExceptions); - executor.shutdown(); + executor.shutdownNow(); } private void waitFor(String errorMessage, Callable<Boolean> callable) throws Exception { @@ -189,24 +190,23 @@ public class PooledClientProviderTest { private static class RetrieveClient implements Callable<Integer> { private final PooledClientProvider pool; - private final long holdClientMs; - private final CountDownLatch begin; + private final CountDownLatch done; - public RetrieveClient(PooledClientProvider pool, long holdClientMs, - CountDownLatch begin) { + RetrieveClient(PooledClientProvider pool, CountDownLatch done) { this.pool = pool; - this.holdClientMs = holdClientMs; - this.begin = begin; + this.done = done; } @Override public Integer call() throws Exception { - begin.await(); try (CloseableThriftClient client = pool.getCloseableClient()) { int id = System.identityHashCode(client.getThriftClient()); - // "use" the client for a configured amount of milliseconds - Thread.sleep(holdClientMs); + done.countDown(); + done.await(); return id; + } catch (TimeoutException e) { + done.countDown(); + throw e; } } }
