IGNITE-1681: Dogpile effect tests for CacheStoreBalancingWrapper
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d70f7eda Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d70f7eda Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d70f7eda Branch: refs/heads/ignite-1282 Commit: d70f7eda0492857ffd4879c311c814867552070e Parents: 7ba2efb Author: Andrey Gura <ag...@gridgain.com> Authored: Tue Nov 10 13:59:38 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Tue Nov 10 13:59:38 2015 +0300 ---------------------------------------------------------------------- .../store/GridCacheBalancingStoreSelfTest.java | 181 ++++++++++++++++++- 1 file changed, 180 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d70f7eda/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java index d41a441..1e3e4b4 100644 --- a/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/cache/store/GridCacheBalancingStoreSelfTest.java @@ -17,10 +17,14 @@ package org.apache.ignite.cache.store; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -29,8 +33,10 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.cache.Cache; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.processors.cache.CacheStoreBalancingWrapper; import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.testframework.GridTestUtils; @@ -118,6 +124,81 @@ public class GridCacheBalancingStoreSelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testConcurrentLoad() throws Exception { + int threads = 5; + + final int keys = 50; + + final CyclicBarrier beforeBarrier = new CyclicBarrier(threads); + + ConcurrentVerifyStore store = new ConcurrentVerifyStore(keys); + + final CacheStoreBalancingWrapper<Integer, Integer> wrapper =new CacheStoreBalancingWrapper<>(store); + + GridTestUtils.runMultiThreaded(new Runnable() { + @Override public void run() { + for (int i = 0; i < keys; i++) { + try { + beforeBarrier.await(); + } + catch (InterruptedException | BrokenBarrierException e) { + throw new RuntimeException(e); + } + + info("Load key: " + i); + + wrapper.load(i); + } + } + }, threads, "load-thread"); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentLoadAll() throws Exception { + int threads = 5; + + final int threshold = 5; + + final int keysCnt = 100; + + final CyclicBarrier beforeBarrier = new CyclicBarrier(threads); + + ConcurrentVerifyStore store = new ConcurrentVerifyStore(keysCnt); + + final CacheStoreBalancingWrapper<Integer, Integer> wrapper = new CacheStoreBalancingWrapper<>(store); + + GridTestUtils.runMultiThreaded(new Runnable() { + @Override public void run() { + for (int i = 0; i < keysCnt; i += threshold) { + try { + beforeBarrier.await(); + } + catch (InterruptedException | BrokenBarrierException e) { + throw new RuntimeException(e); + } + + List<Integer> keys = new ArrayList<>(threshold); + + for (int j = i; j < i + threshold; j++) + keys.add(j); + + info("Load keys: " + keys); + + wrapper.loadAll(keys, new IgniteBiInClosure<Integer, Integer>() { + @Override public void apply(Integer integer, Integer integer2) { + // No-op. + } + }); + } + } + }, threads, "load-thread"); + } + + /** * */ private static class VerifyStore implements CacheStore<Integer, Integer> { @@ -204,4 +285,102 @@ public class GridCacheBalancingStoreSelfTest extends GridCommonAbstractTest { // No-op. } } -} \ No newline at end of file + + /** + * + */ + private static class ConcurrentVerifyStore implements CacheStore<Integer, Integer> { + + /** Cnts. */ + private final AtomicInteger[] cnts; + + /** + */ + private ConcurrentVerifyStore(int keys) { + this.cnts = new AtomicInteger[keys]; + + for (int i = 0; i < keys; i++) + cnts[i] = new AtomicInteger(); + } + + /** + * {@inheritDoc} + */ + @Override public Integer load(Integer key) { + try { + U.sleep(500); + } + catch (IgniteInterruptedCheckedException e) { + throw new RuntimeException(e); + } + + assertEquals("Redundant load call.", 1, cnts[key].incrementAndGet()); + + return key; + } + + /** + * {@inheritDoc} + */ + @Override public void loadCache(IgniteBiInClosure<Integer, Integer> clo, @Nullable Object... args) { + // No-op. + } + + /** + * {@inheritDoc} + */ + @Override public Map<Integer, Integer> loadAll(Iterable<? extends Integer> keys) { + try { + U.sleep(500); + } + catch (IgniteInterruptedCheckedException e) { + e.printStackTrace(); + } + + Map<Integer, Integer> loaded = new HashMap<>(); + + for (Integer key : keys) { + assertEquals("Redundant loadAll call.", 1, cnts[key].incrementAndGet()); + + loaded.put(key, key); + } + + return loaded; + } + + /** + * {@inheritDoc} + */ + @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) { + // No-op. + } + + /** + * {@inheritDoc} + */ + @Override public void writeAll(Collection<Cache.Entry<? extends Integer, ? extends Integer>> entries) { + // No-op. + } + + /** + * {@inheritDoc} + */ + @Override public void delete(Object key) { + // No-op. + } + + /** + * {@inheritDoc} + */ + @Override public void deleteAll(Collection<?> keys) { + // No-op. + } + + /** + * {@inheritDoc} + */ + @Override public void sessionEnd(boolean commit) { + // No-op. + } + } +}