Repository: ignite
Updated Branches:
  refs/heads/ignite-5075 68e799c8e -> a5f59f09a


ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a5f59f09
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a5f59f09
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a5f59f09

Branch: refs/heads/ignite-5075
Commit: a5f59f09a4848d3e781066a25b65a698763280a1
Parents: 68e799c
Author: sboikov <[email protected]>
Authored: Tue May 16 17:11:24 2017 +0300
Committer: sboikov <[email protected]>
Committed: Tue May 16 17:11:24 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheGroupsTest.java | 111 ++++++++++++++++++-
 1 file changed, 110 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a5f59f09/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index 20ac80c..601ebed 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -18,8 +18,12 @@
 package org.apache.ignite.internal.processors.cache;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ThreadLocalRandom;
@@ -706,6 +710,110 @@ public class IgniteCacheGroupsTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testConcurrentOperationsSameKeys() throws Exception {
+        final int SRVS = 4;
+        final int CLIENTS = 4;
+        final int NODES = SRVS + CLIENTS;
+
+        startGrid(0);
+
+        Ignite srv0 = startGridsMultiThreaded(1, SRVS - 1);
+
+        client = true;
+
+        startGridsMultiThreaded(SRVS, CLIENTS);
+
+        srv0.createCache(cacheConfiguration(GROUP1, "a0", PARTITIONED, ATOMIC, 
1, false));
+        srv0.createCache(cacheConfiguration(GROUP1, "a1", PARTITIONED, ATOMIC, 
1, false));
+        srv0.createCache(cacheConfiguration(GROUP1, "t0", PARTITIONED, 
TRANSACTIONAL, 1, false));
+        srv0.createCache(cacheConfiguration(GROUP1, "t1", PARTITIONED, 
TRANSACTIONAL, 1, false));
+
+        final List<Integer> keys = new ArrayList<>();
+
+        for (int i = 0; i < 50; i++)
+            keys.add(i);
+
+        final AtomicBoolean err = new AtomicBoolean();
+
+        final AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture fut1 = updateFuture(NODES, "a0", keys, false, 
stop, err);
+        IgniteInternalFuture fut2 = updateFuture(NODES, "a1", keys, true, 
stop, err);
+        IgniteInternalFuture fut3 = updateFuture(NODES, "t0", keys, false, 
stop, err);
+        IgniteInternalFuture fut4 = updateFuture(NODES, "t1", keys, true, 
stop, err);
+
+        try {
+            for (int i = 0; i < 15 && !stop.get(); i++)
+                U.sleep(1_000);
+        }
+        finally {
+            stop.set(true);
+        }
+
+        fut1.get();
+        fut2.get();
+        fut3.get();
+        fut4.get();
+
+        assertFalse("Unexpected error, see log for details", err.get());
+    }
+
+    /**
+     * @param nodes Total number of nodes.
+     * @param cacheName Cache name.
+     * @param keys Keys to update.
+     * @param reverse {@code True} if update in reverse order.
+     * @param stop Stop flag.
+     * @param err Error flag.
+     * @return Update future.
+     */
+    private IgniteInternalFuture updateFuture(final int nodes,
+        final String cacheName,
+        final List<Integer> keys,
+        final boolean reverse,
+        final AtomicBoolean stop,
+        final AtomicBoolean err) {
+        final AtomicInteger idx = new AtomicInteger();
+
+        return GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                try {
+                    Ignite node = ignite(idx.getAndIncrement() % nodes);
+
+                    log.info("Start thread [node=" + node.name() + ']');
+
+                    IgniteCache cache = node.cache(cacheName);
+
+                    Map<Integer, Integer> map = new LinkedHashMap<>();
+
+                    if (reverse) {
+                        for (int i = keys.size() - 1; i >= 0; i--)
+                            map.put(keys.get(i), 2);
+                    }
+                    else {
+                        for (Integer key : keys)
+                            map.put(key, 1);
+                    }
+
+                    while (!stop.get())
+                        cache.putAll(map);
+                }
+                catch (Exception e) {
+                    err.set(true);
+
+                    log.error("Unexpected error: " + e, e);
+
+                    stop.set(true);
+                }
+
+                return null;
+            }
+        }, nodes * 2, "update-" + cacheName + "-" + reverse);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testConcurrentOperationsAndCacheDestroy() throws Exception {
         final int SRVS = 4;
         final int CLIENTS = 4;
@@ -818,7 +926,8 @@ public class IgniteCacheGroupsTest extends 
GridCommonAbstractTest {
         }, "cache-destroy-thread");
 
         try {
-            U.sleep(30_000);
+            for (int i = 0; i < 30 && !stop.get(); i++)
+                U.sleep(1_000);
         }
         finally {
             stop.set(true);

Reply via email to