Ignite-5075 pending
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/70a2fe87 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/70a2fe87 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/70a2fe87 Branch: refs/heads/ignite-5075-pds Commit: 70a2fe878ec96347683da6e121d21bef77d49cce Parents: e65697d Author: Igor Seliverstov <[email protected]> Authored: Wed May 17 17:43:21 2017 +0300 Committer: Igor Seliverstov <[email protected]> Committed: Fri May 19 12:33:16 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheGroupsTest.java | 256 ++++++++++++++++--- 1 file changed, 223 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/70a2fe87/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java index f1b5345..403973b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java @@ -34,11 +34,14 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.cache.Cache; import java.util.concurrent.locks.Lock; import javax.cache.CacheException; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.Ignition; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheExistsException; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; @@ -55,6 +58,7 @@ import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.lang.GridPlainCallable; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -771,6 +775,23 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { } /** + * Creates a map with random integers. + * + * @param cnt Map size length. + * @return Map with random integers. + */ + private Map<Integer, Integer> generateDataMap(int cnt) { + Random rnd = ThreadLocalRandom.current(); + + Map<Integer, Integer> data = U.newHashMap(cnt); + + for (int i = 0; i < cnt; i++) + data.put(i, rnd.nextInt()); + + return data; + } + + /** * @param cnt Sequence length. * @return Sequence of integers. */ @@ -1494,61 +1515,230 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { * @param backups Number of backups. * @param heapCache On heap cache flag. */ - private void cacheApiTest(CacheMode cacheMode, CacheAtomicityMode atomicityMode, int backups, boolean heapCache) { - for (int i = 0; i < 2; i++) - ignite(0).createCache(cacheConfiguration(GROUP1, "cache-" + i, cacheMode, atomicityMode, backups, heapCache)); + private void cacheApiTest(final CacheMode cacheMode, + final CacheAtomicityMode atomicityMode, + final int backups, + final boolean heapCache) throws Exception { + Ignite srv0 = ignite(0); + + srv0.createCache(cacheConfiguration(GROUP1, "cache-0", cacheMode, atomicityMode, backups, heapCache)); + srv0.createCache(cacheConfiguration(GROUP1, "cache-1", cacheMode, atomicityMode, backups, heapCache)); + srv0.createCache(cacheConfiguration(GROUP2, "cache-2", cacheMode, atomicityMode, backups, heapCache)); + srv0.createCache(cacheConfiguration(null, "cache-3", cacheMode, atomicityMode, backups, heapCache)); + + awaitPartitionMapExchange(); try { - for (Ignite node : Ignition.allGrids()) { - for (int i = 0; i < 2; i++) { - IgniteCache cache = node.cache("cache-" + i); - - log.info("Test cache [node=" + node.name() + - ", cache=" + cache.getName() + - ", mode=" + cacheMode + - ", atomicity=" + atomicityMode + - ", backups=" + backups + - ", heapCache=" + heapCache + - ']'); - - cacheApiTest(cache); - } + for (final Ignite node : Ignition.allGrids()) { + List<Callable<?>> ops = new ArrayList<>(); + + for (int i = 0; i < 4; i++) + ops.add(testSet(node.cache("cache-" + i), cacheMode, atomicityMode, backups, heapCache, node)); + + // sync operations + for (Callable<?> op : ops) + op.call(); + + // async operations + GridTestUtils.runMultiThreaded(ops, "cacheApiTest"); } } finally { - for (int i = 0; i < 2; i++) - ignite(0).destroyCache("cache-" + i); + for (int i = 0; i < 4; i++) + srv0.destroyCache("cache-" + i); } } /** * @param cache Cache. + * @param cacheMode Cache mode. + * @param atomicityMode Atomicity mode. + * @param backups Number of backups. + * @param heapCache On heap cache flag. + * @param node Ignite node. + * @return Callable for the test operations. + */ + private Callable<?> testSet( + final IgniteCache<Object, Object> cache, + final CacheMode cacheMode, + final CacheAtomicityMode atomicityMode, + final int backups, + final boolean heapCache, + final Ignite node) { + return new Callable<Void>() { + @Override public Void call() throws Exception { + log.info("Test cache [node=" + node.name() + + ", cache=" + cache.getName() + + ", mode=" + cacheMode + + ", atomicity=" + atomicityMode + + ", backups=" + backups + + ", heapCache=" + heapCache + + ']'); + + cacheApiTest(cache); + + return null; + } + }; + } + + /** + * @param cache Cache. */ private void cacheApiTest(IgniteCache cache) { - ThreadLocalRandom rnd = ThreadLocalRandom.current(); + cachePutAllGetAll(cache); + cachePutRemove(cache); + cachePutGet(cache); + cachePutGetAndPut(cache); + cacheQuery(cache); + cacheInvokeAll(cache); + cacheInvoke(cache); + } - for (int i = 0; i < 10; i++) { - Integer key = rnd.nextInt(10_000); + private void tearDown(IgniteCache cache) { + cache.clear(); + cache.removeAll(); + } + + private void cachePutAllGetAll(IgniteCache cache) { + Map<Integer, Integer> data = generateDataMap(10000); + + cache.putAll(data); + + Map data0 = cache.getAll(data.keySet()); + + assertEquals(data.size(), data0.size()); + + for (Map.Entry<Integer, Integer> entry : data.entrySet()) { + assertEquals(entry.getValue(), data0.get(entry.getKey())); + } + + tearDown(cache); + } - assertNull(cache.get(key)); - assertFalse(cache.containsKey(key)); + private void cachePutRemove(IgniteCache cache) { + Random rnd = ThreadLocalRandom.current(); + + Integer key = rnd.nextInt(); + Integer val = rnd.nextInt(); - Integer val = key + 1; + cache.put(key, val); - cache.put(key, val); + assertTrue(cache.remove(key)); - assertEquals(val, cache.get(key)); - assertTrue(cache.containsKey(key)); + assertNull(cache.get(key)); - cache.remove(key); + tearDown(cache); + } - assertNull(cache.get(key)); - assertFalse(cache.containsKey(key)); + private void cachePutGet(IgniteCache cache) { + Random rnd = ThreadLocalRandom.current(); + + Integer key = rnd.nextInt(); + Integer val = rnd.nextInt(); + + cache.put(key, val); + + Object val0 = cache.get(key); + + assertEquals(val, val0); + + tearDown(cache); + } + + private void cachePutGetAndPut(IgniteCache cache) { + Random rnd = ThreadLocalRandom.current(); + + Integer key = rnd.nextInt(); + Integer val1 = rnd.nextInt(); + Integer val2 = rnd.nextInt(); + + cache.put(key, val1); + + Object val0 = cache.getAndPut(key, val2); + + assertEquals(val1, val0); + + val0 = cache.get(key); + + assertEquals(val2, val0); + + tearDown(cache); + } + + private void cacheQuery(IgniteCache cache) { + Map<Integer, Integer> data = generateDataMap(10000); + + cache.putAll(data); + + ScanQuery<Integer, Integer> qry = new ScanQuery<>(new IgniteBiPredicate<Integer, Integer>() { + @Override public boolean apply(Integer integer, Integer integer2) { + return integer % 2 == 0; + } + }); + + List<Cache.Entry<Integer, Integer>> all = cache.query(qry).getAll(); + + assertEquals(all.size(), data.size() / 2); + + for (Cache.Entry<Integer, Integer> entry : all) { + assertEquals(0, entry.getKey() % 2); + assertEquals(entry.getValue(), data.get(entry.getKey())); } - cache.clear(); + tearDown(cache); + } - cache.removeAll(); + private void cacheInvokeAll(IgniteCache cache) { + Map<Integer, Integer> data = generateDataMap(10000); + + cache.putAll(data); + + Random rnd = ThreadLocalRandom.current(); + + int one = rnd.nextInt(); + int two = rnd.nextInt(); + + Map<Integer, CacheInvokeResult<Integer>> res = cache.invokeAll(data.keySet(), new CacheEntryProcessor<Integer, Integer, Integer>() { + @Override public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments) throws EntryProcessorException { + Object removed = ((Map)arguments[0]).remove(entry.getKey()); + + assertEquals(removed, entry.getValue()); + + // Some calculation + return (Integer)arguments[1] + (Integer)arguments[2]; + } + }, data, one, two); + + assertEquals(10000, res.size()); + assertEquals(one + two, (Object)res.get(0).get()); + + tearDown(cache); + } + + private void cacheInvoke(IgniteCache cache) { + Random rnd = ThreadLocalRandom.current(); + + Integer key = rnd.nextInt(); + Integer val = rnd.nextInt(); + + cache.put(key, val); + + int one = rnd.nextInt(); + int two = rnd.nextInt(); + + Object res = cache.invoke(key, new CacheEntryProcessor<Integer, Integer, Integer>() { + @Override public Integer process(MutableEntry<Integer, Integer> entry, Object... arguments) throws EntryProcessorException { + assertEquals(arguments[0], entry.getValue()); + + // Some calculation + return (Integer)arguments[1] + (Integer)arguments[2]; + } + }, val, one, two); + + assertEquals(one + two, res); + + tearDown(cache); } /**
