Ignite-5075 pending

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/70a2fe87
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/70a2fe87
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/70a2fe87

Branch: refs/heads/ignite-5075-pds
Commit: 70a2fe878ec96347683da6e121d21bef77d49cce
Parents: e65697d
Author: Igor Seliverstov <[email protected]>
Authored: Wed May 17 17:43:21 2017 +0300
Committer: Igor Seliverstov <[email protected]>
Committed: Fri May 19 12:33:16 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheGroupsTest.java | 256 ++++++++++++++++---
 1 file changed, 223 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/70a2fe87/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index f1b5345..403973b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -34,11 +34,14 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.Cache;
 import java.util.concurrent.locks.Lock;
 import javax.cache.CacheException;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheExistsException;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
@@ -55,6 +58,7 @@ import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.lang.GridPlainCallable;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -771,6 +775,23 @@ public class IgniteCacheGroupsTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * Creates a map with random integers.
+     *
+     * @param cnt Map size length.
+     * @return Map with random integers.
+     */
+    private Map<Integer, Integer> generateDataMap(int cnt) {
+        Random rnd = ThreadLocalRandom.current();
+
+        Map<Integer, Integer> data = U.newHashMap(cnt);
+
+        for (int i = 0; i < cnt; i++)
+            data.put(i, rnd.nextInt());
+
+        return data;
+    }
+
+    /**
      * @param cnt Sequence length.
      * @return Sequence of integers.
      */
@@ -1494,61 +1515,230 @@ public class IgniteCacheGroupsTest extends 
GridCommonAbstractTest {
      * @param backups Number of backups.
      * @param heapCache On heap cache flag.
      */
-    private void cacheApiTest(CacheMode cacheMode, CacheAtomicityMode 
atomicityMode, int backups, boolean heapCache) {
-        for (int i = 0; i < 2; i++)
-            ignite(0).createCache(cacheConfiguration(GROUP1, "cache-" + i, 
cacheMode, atomicityMode, backups, heapCache));
+    private void cacheApiTest(final CacheMode cacheMode,
+        final CacheAtomicityMode atomicityMode,
+        final int backups,
+        final boolean heapCache) throws Exception {
+        Ignite srv0 = ignite(0);
+
+        srv0.createCache(cacheConfiguration(GROUP1, "cache-0", cacheMode, 
atomicityMode, backups, heapCache));
+        srv0.createCache(cacheConfiguration(GROUP1, "cache-1", cacheMode, 
atomicityMode, backups, heapCache));
+        srv0.createCache(cacheConfiguration(GROUP2, "cache-2", cacheMode, 
atomicityMode, backups, heapCache));
+        srv0.createCache(cacheConfiguration(null, "cache-3", cacheMode, 
atomicityMode, backups, heapCache));
+
+        awaitPartitionMapExchange();
 
         try {
-            for (Ignite node : Ignition.allGrids()) {
-                for (int i = 0; i < 2; i++) {
-                    IgniteCache cache = node.cache("cache-" + i);
-
-                    log.info("Test cache [node=" + node.name() +
-                        ", cache=" + cache.getName() +
-                        ", mode=" + cacheMode +
-                        ", atomicity=" + atomicityMode +
-                        ", backups=" + backups +
-                        ", heapCache=" + heapCache +
-                        ']');
-
-                    cacheApiTest(cache);
-                }
+            for (final Ignite node : Ignition.allGrids()) {
+                List<Callable<?>> ops = new ArrayList<>();
+
+                for (int i = 0; i < 4; i++)
+                    ops.add(testSet(node.cache("cache-" + i), cacheMode, 
atomicityMode, backups, heapCache, node));
+
+                // sync operations
+                for (Callable<?> op : ops)
+                    op.call();
+
+                // async operations
+                GridTestUtils.runMultiThreaded(ops, "cacheApiTest");
             }
         }
         finally {
-            for (int i = 0; i < 2; i++)
-                ignite(0).destroyCache("cache-" + i);
+            for (int i = 0; i < 4; i++)
+                srv0.destroyCache("cache-" + i);
         }
     }
 
     /**
      * @param cache Cache.
+     * @param cacheMode Cache mode.
+     * @param atomicityMode Atomicity mode.
+     * @param backups Number of backups.
+     * @param heapCache On heap cache flag.
+     * @param node Ignite node.
+     * @return Callable for the test operations.
+     */
+    private Callable<?> testSet(
+        final IgniteCache<Object, Object> cache,
+        final CacheMode cacheMode,
+        final CacheAtomicityMode atomicityMode,
+        final int backups,
+        final boolean heapCache,
+        final Ignite node) {
+        return new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                log.info("Test cache [node=" + node.name() +
+                    ", cache=" + cache.getName() +
+                    ", mode=" + cacheMode +
+                    ", atomicity=" + atomicityMode +
+                    ", backups=" + backups +
+                    ", heapCache=" + heapCache +
+                    ']');
+
+                cacheApiTest(cache);
+
+                return null;
+            }
+        };
+    }
+
+    /**
+     * @param cache Cache.
      */
     private void cacheApiTest(IgniteCache cache) {
-        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+        cachePutAllGetAll(cache);
+        cachePutRemove(cache);
+        cachePutGet(cache);
+        cachePutGetAndPut(cache);
+        cacheQuery(cache);
+        cacheInvokeAll(cache);
+        cacheInvoke(cache);
+    }
 
-        for (int i = 0; i < 10; i++) {
-            Integer key = rnd.nextInt(10_000);
+    private void tearDown(IgniteCache cache) {
+        cache.clear();
+        cache.removeAll();
+    }
+
+    private void cachePutAllGetAll(IgniteCache cache) {
+        Map<Integer, Integer> data = generateDataMap(10000);
+
+        cache.putAll(data);
+
+        Map data0 = cache.getAll(data.keySet());
+
+        assertEquals(data.size(), data0.size());
+
+        for (Map.Entry<Integer, Integer> entry : data.entrySet()) {
+            assertEquals(entry.getValue(), data0.get(entry.getKey()));
+        }
+
+        tearDown(cache);
+    }
 
-            assertNull(cache.get(key));
-            assertFalse(cache.containsKey(key));
+    private void cachePutRemove(IgniteCache cache) {
+        Random rnd = ThreadLocalRandom.current();
+
+        Integer key = rnd.nextInt();
+        Integer val = rnd.nextInt();
 
-            Integer val = key + 1;
+        cache.put(key, val);
 
-            cache.put(key, val);
+        assertTrue(cache.remove(key));
 
-            assertEquals(val, cache.get(key));
-            assertTrue(cache.containsKey(key));
+        assertNull(cache.get(key));
 
-            cache.remove(key);
+        tearDown(cache);
+    }
 
-            assertNull(cache.get(key));
-            assertFalse(cache.containsKey(key));
+    private void cachePutGet(IgniteCache cache) {
+        Random rnd = ThreadLocalRandom.current();
+
+        Integer key = rnd.nextInt();
+        Integer val = rnd.nextInt();
+
+        cache.put(key, val);
+
+        Object val0 = cache.get(key);
+
+        assertEquals(val, val0);
+
+        tearDown(cache);
+    }
+
+    private void cachePutGetAndPut(IgniteCache cache) {
+        Random rnd = ThreadLocalRandom.current();
+
+        Integer key = rnd.nextInt();
+        Integer val1 = rnd.nextInt();
+        Integer val2 = rnd.nextInt();
+
+        cache.put(key, val1);
+
+        Object val0 = cache.getAndPut(key, val2);
+
+        assertEquals(val1, val0);
+
+        val0 = cache.get(key);
+
+        assertEquals(val2, val0);
+
+        tearDown(cache);
+    }
+
+    private void cacheQuery(IgniteCache cache) {
+        Map<Integer, Integer> data = generateDataMap(10000);
+
+        cache.putAll(data);
+
+        ScanQuery<Integer, Integer> qry = new ScanQuery<>(new 
IgniteBiPredicate<Integer, Integer>() {
+            @Override public boolean apply(Integer integer, Integer integer2) {
+                return integer % 2 == 0;
+            }
+        });
+
+        List<Cache.Entry<Integer, Integer>> all = cache.query(qry).getAll();
+
+        assertEquals(all.size(), data.size() / 2);
+
+        for (Cache.Entry<Integer, Integer> entry : all) {
+            assertEquals(0, entry.getKey() % 2);
+            assertEquals(entry.getValue(), data.get(entry.getKey()));
         }
 
-        cache.clear();
+        tearDown(cache);
+    }
 
-        cache.removeAll();
+    private void cacheInvokeAll(IgniteCache cache) {
+        Map<Integer, Integer> data = generateDataMap(10000);
+
+        cache.putAll(data);
+
+        Random rnd = ThreadLocalRandom.current();
+
+        int one = rnd.nextInt();
+        int two = rnd.nextInt();
+
+        Map<Integer, CacheInvokeResult<Integer>> res = 
cache.invokeAll(data.keySet(), new CacheEntryProcessor<Integer, Integer, 
Integer>() {
+            @Override public Integer process(MutableEntry<Integer, Integer> 
entry, Object... arguments) throws EntryProcessorException {
+                Object removed = ((Map)arguments[0]).remove(entry.getKey());
+
+                assertEquals(removed, entry.getValue());
+
+                // Some calculation
+                return (Integer)arguments[1] + (Integer)arguments[2];
+            }
+        }, data, one, two);
+
+        assertEquals(10000, res.size());
+        assertEquals(one + two, (Object)res.get(0).get());
+
+        tearDown(cache);
+    }
+
+    private void cacheInvoke(IgniteCache cache) {
+        Random rnd = ThreadLocalRandom.current();
+
+        Integer key = rnd.nextInt();
+        Integer val = rnd.nextInt();
+
+        cache.put(key, val);
+
+        int one = rnd.nextInt();
+        int two = rnd.nextInt();
+
+        Object res = cache.invoke(key, new CacheEntryProcessor<Integer, 
Integer, Integer>() {
+            @Override public Integer process(MutableEntry<Integer, Integer> 
entry, Object... arguments) throws EntryProcessorException {
+                assertEquals(arguments[0], entry.getValue());
+
+                // Some calculation
+                return (Integer)arguments[1] + (Integer)arguments[2];
+            }
+        }, val, one, two);
+
+        assertEquals(one + two, res);
+
+        tearDown(cache);
     }
 
     /**

Reply via email to