ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9b850cc1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9b850cc1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9b850cc1 Branch: refs/heads/ignite-5075 Commit: 9b850cc181bcd25f2362edeed33f7aa8a097f555 Parents: e4be5ab Author: sboikov <[email protected]> Authored: Mon May 15 12:05:07 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon May 15 12:26:29 2017 +0300 ---------------------------------------------------------------------- .../distributed/GridDistributedBaseMessage.java | 1 - .../cache/transactions/IgniteTxEntry.java | 4 +- .../processors/cache/IgniteCacheGroupsTest.java | 187 ++++++++++++++++++- 3 files changed, 182 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9b850cc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java index 65b16a4..fc209aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java @@ -25,7 +25,6 @@ import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionable; import org.apache.ignite.internal.util.tostring.GridToStringExclude; http://git-wip-us.apache.org/repos/asf/ignite/blob/9b850cc1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 163ed99..30aa335 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -565,7 +565,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { */ public void cached(GridCacheEntryEx entry) { assert entry == null || entry.context() == ctx : "Invalid entry assigned to tx entry [txEntry=" + this + - ", entry=" + entry + ", ctxNear=" + ctx.isNear() + ", ctxDht=" + ctx.isDht() + ']'; + ", entry=" + entry + + ", ctxNear=" + ctx.isNear() + + ", ctxDht=" + ctx.isDht() + ']'; this.entry = entry; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9b850cc1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java index 39dc044..c10321e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java @@ -19,15 +19,21 @@ package org.apache.ignite.internal.processors.cache; import java.io.Serializable; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.Ignition; import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -35,6 +41,8 @@ import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; /** @@ -84,7 +92,7 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { Ignite client = startGrid(1); - IgniteCache c1 = client.createCache(cacheConfiguration(GROUP1, "c1", ATOMIC, 0)); + IgniteCache c1 = client.createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 0)); checkCacheGroup(0, GROUP1, true); checkCacheGroup(0, GROUP1, true); @@ -137,7 +145,7 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { for (int iter = 0; iter < 3; iter++) { log.info("Iteration: " + iter); - srv0.createCache(cacheConfiguration(GROUP1, "cache1", ATOMIC, 2)); + srv0.createCache(cacheConfiguration(GROUP1, "cache1", PARTITIONED, ATOMIC, 2)); for (int i = 0; i < srvs; i++) { checkCacheGroup(i, GROUP1, true); @@ -145,7 +153,7 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { checkCache(i, "cache1"); } - srv0.createCache(cacheConfiguration(GROUP1, "cache2", ATOMIC, 2)); + srv0.createCache(cacheConfiguration(GROUP1, "cache2", PARTITIONED, ATOMIC, 2)); for (int i = 0; i < srvs; i++) { checkCacheGroup(i, GROUP1, true); @@ -193,8 +201,10 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { Ignite srv0 = startGrid(0); { - IgniteCache<Object, Object> cache1 = srv0.createCache(cacheConfiguration("grp1", "cache1", ATOMIC, 2)); - IgniteCache<Object, Object> cache2 = srv0.createCache(cacheConfiguration("grp1", "cache2", ATOMIC, 2)); + IgniteCache<Object, Object> cache1 = + srv0.createCache(cacheConfiguration("grp1", "cache1", PARTITIONED, ATOMIC, 2)); + IgniteCache<Object, Object> cache2 = + srv0.createCache(cacheConfiguration("grp1", "cache2", PARTITIONED, ATOMIC, 2)); cache1.put(new Key1(1), 1); assertEquals(1, cache1.get(new Key1(1))); @@ -231,8 +241,10 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { Ignite srv0 = startGrid(0); { - IgniteCache<Object, Object> cache1 = srv0.createCache(cacheConfiguration("grp1", "cache1", ATOMIC, 0)); - IgniteCache<Object, Object> cache2 = srv0.createCache(cacheConfiguration("grp1", "cache2", ATOMIC, 0)); + IgniteCache<Object, Object> cache1 = + srv0.createCache(cacheConfiguration(GROUP1, "cache1", PARTITIONED, ATOMIC, 0)); + IgniteCache<Object, Object> cache2 = + srv0.createCache(cacheConfiguration(GROUP1, "cache2", PARTITIONED, ATOMIC, 0)); for (int i = 0; i < 10; i++) { cache1.put(new Key1(i), 1); @@ -246,6 +258,162 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testCacheApiTx() throws Exception { + startGridsMultiThreaded(4); + + client = true; + + startGrid(4); + + cacheApiTest(PARTITIONED, TRANSACTIONAL, 2); + } + + /** + * @param cacheMode Cache mode. + * @param atomicityMode Atomicity mode. + * @param backups Number of backups. + */ + private void cacheApiTest(CacheMode cacheMode, CacheAtomicityMode atomicityMode, int backups) { + for (int i = 0; i < 2; i++) + ignite(0).createCache(cacheConfiguration(GROUP1, "cache-" + i, cacheMode, atomicityMode, backups)); + + for (Ignite node : Ignition.allGrids()) { + for (int i = 0; i < 2; i++) { + IgniteCache cache = node.cache("cache-" + i); + + log.info("Test cache [node=" + node.name() + ", cache=" + cache.getName() + + ", mode=" + cacheMode + ", atomicity=" + atomicityMode + ", backups=" + backups + ']'); + + cacheApiTest(cache); + } + } + } + + /** + * @param cache Cache. + */ + private void cacheApiTest(IgniteCache cache) { + int key = 1; + + cache.put(key, 1); + + cache.remove(key); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentOperations() throws Exception { + final int SRVS = 4; + final int CLIENTS = 4; + final int NODES = SRVS + CLIENTS; + + Ignite srv0 = startGridsMultiThreaded(SRVS); + + client = true; + + startGridsMultiThreaded(SRVS, CLIENTS); + + final int CACHES = 4; + + for (int i = 0; i < CACHES; i++) { + srv0.createCache(cacheConfiguration(GROUP1, GROUP1 + "-" + i, PARTITIONED, ATOMIC, i)); + srv0.createCache(cacheConfiguration(GROUP2, GROUP2 + "-" + i, PARTITIONED, TRANSACTIONAL, i)); + } + + final AtomicInteger idx = new AtomicInteger(); + + final AtomicBoolean err = new AtomicBoolean(); + + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteInternalFuture opFut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + try { + Ignite node = ignite(idx.getAndIncrement() % NODES); + + log.info("Start thread [node=" + node.name() + ']'); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (!stop.get()) { + String grp = rnd.nextBoolean() ? GROUP1 : GROUP2; + int cacheIdx = rnd.nextInt(CACHES); + + IgniteCache cache = node.cache(grp + "-" + cacheIdx); + + for (int i = 0; i < 10; i++) + cacheOperation(rnd, cache); + } + } + catch (Exception e) { + err.set(true); + + log.error("Unexpected error: " + e, e); + + stop.set(true); + } + } + }, (SRVS + CLIENTS) * 2, "op-thread"); + + IgniteInternalFuture cacheFut = GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + try { + while (!stop.get()) { +// ThreadLocalRandom rnd = ThreadLocalRandom.current(); +// +// String grp = rnd.nextBoolean() ? GROUP1 : GROUP2; +// +// Ignite node = ignite(rnd.nextInt(NODES)); +// +// IgniteCache cache = node.createCache(cacheConfiguration(grp, "tmpCache", +// rnd.nextBoolean() ? ATOMIC : TRANSACTIONAL, +// rnd.nextInt(3))); +// +// for (int i = 0; i < 10; i++) +// cacheOperation(rnd, cache); +// +// node.destroyCache(cache.getName()); + + U.sleep(1000); + } + } + catch (Exception e) { + err.set(true); + + log.error("Unexpected error: " + e, e); + + stop.set(true); + } + } + }, "cache-thread"); + + try { + U.sleep(10_000); + } + finally { + stop.set(true); + } + + opFut.get(); + cacheFut.get(); + + assertFalse("Unexpected error, see log for details", err.get()); + } + + /** + * @param rnd Random. + * @param cache Cache. + */ + private void cacheOperation(ThreadLocalRandom rnd, IgniteCache cache) { + int key = rnd.nextInt(1000); + + cache.put(key, 1); + } + + /** * */ static class Key1 implements Serializable { @@ -311,8 +479,10 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { } } - private CacheConfiguration cacheConfiguration(String grpName, + private CacheConfiguration cacheConfiguration( + String grpName, String name, + CacheMode cacheMode, CacheAtomicityMode atomicityMode, int backups) { CacheConfiguration ccfg = new CacheConfiguration(); @@ -321,6 +491,7 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { ccfg.setGroupName(grpName); ccfg.setAtomicityMode(atomicityMode); ccfg.setBackups(backups); + ccfg.setCacheMode(cacheMode); ccfg.setWriteSynchronizationMode(FULL_SYNC); return ccfg;
