Repository: ignite Updated Branches: refs/heads/ignite-5075 3f6a79f77 -> 56d87b36e
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/56d87b36 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/56d87b36 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/56d87b36 Branch: refs/heads/ignite-5075 Commit: 56d87b36e07553d2fe37b2f0e45fb21e057e3fe7 Parents: 3f6a79f Author: sboikov <[email protected]> Authored: Tue May 16 15:48:13 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue May 16 15:48:13 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheGroupInfrastructure.java | 5 + .../cache/GridCacheAffinityManager.java | 38 -- .../processors/cache/GridCacheProcessor.java | 11 +- .../distributed/dht/GridDhtCacheAdapter.java | 13 - .../processors/cache/IgniteCacheGroupsTest.java | 429 ++++++++++++++++++- 5 files changed, 433 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/56d87b36/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java index 9581068..4241c51 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java @@ -537,6 +537,11 @@ public class CacheGroupInfrastructure { * */ void stopGroup() { + IgniteCheckedException err = + new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping."); + + aff.cancelFutures(err); + offheapMgr.stop(); ctx.io().removeCacheGroupHandlers(grpId); http://git-wip-us.apache.org/repos/asf/ignite/blob/56d87b36/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java index 56ceefb..5d09a27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java @@ -74,44 +74,6 @@ public class GridCacheAffinityManager extends GridCacheManagerAdapter { } /** {@inheritDoc} */ - @Override protected void onKernalStop0(boolean cancel) { - cancelFutures(); - } - - /** - * - */ - public void cancelFutures() { - if (!starting.get()) - // Ignoring attempt to stop manager that has never been started. - return; - - IgniteCheckedException err = - new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping."); - - if (aff != null) - aff.cancelFutures(err); - } - - /** {@inheritDoc} */ - @Override public void onDisconnected(IgniteFuture reconnectFut) { - // TODO IGNITE-5075. -// IgniteCheckedException err = new IgniteClientDisconnectedCheckedException(reconnectFut, -// "Failed to wait for topology update, client disconnected."); -// -// if (aff != null) -// aff.cancelFutures(err); - } - - /** - * - */ - public void onReconnected() { - // TODO IGNITE-5075. -// aff.onReconnected(); - } - - /** {@inheritDoc} */ @Override protected void stop0(boolean cancel, boolean destroy) { aff = null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/56d87b36/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index dddc525..b77b05d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -73,6 +73,7 @@ import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cache.database.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.database.MemoryPolicy; @@ -1017,11 +1018,13 @@ public class GridCacheProcessor extends GridProcessorAdapter { * @param cancel Cancel. */ public void onKernalStopCaches(boolean cancel){ - for (GridCacheAdapter<?, ?> cache : caches.values()) { - GridCacheAffinityManager aff = cache.context().affinity(); + IgniteCheckedException affErr = + new IgniteCheckedException("Failed to wait for topology update, node is stopping."); + + for (CacheGroupInfrastructure grp : cacheGrps.values()) { + GridAffinityAssignmentCache aff = grp.affinity(); - if (aff != null) - aff.cancelFutures(); + aff.cancelFutures(affErr); } for (String cacheName : stopSeq) { http://git-wip-us.apache.org/repos/asf/ignite/blob/56d87b36/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 7c4d59a..7789673 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -177,19 +177,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap } /** {@inheritDoc} */ - @Override public void onReconnected() { - super.onReconnected(); - - ctx.affinity().onReconnected(); - - // TODO IGNITE-5075. -// top.onReconnected(); -// -// if (preldr != null) -// preldr.onReconnected(); - } - - /** {@inheritDoc} */ @Override public void printMemoryStats() { super.printMemoryStats(); http://git-wip-us.apache.org/repos/asf/ignite/blob/56d87b36/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java index 8ff45f2..27df118 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java @@ -18,9 +18,14 @@ package org.apache.ignite.internal.processors.cache; import java.io.Serializable; +import java.util.Arrays; +import java.util.LinkedHashSet; +import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; @@ -40,12 +45,17 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; /** * @@ -74,6 +84,8 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { cfg.setClientMode(client); + cfg.setMarshaller(null); + return cfg; } @@ -262,6 +274,324 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testNoKeyIntersectTx() throws Exception { + testNoKeyIntersect(TRANSACTIONAL); + } + + /** + * @throws Exception If failed. + */ + public void testNoKeyIntersectAtomic() throws Exception { + testNoKeyIntersect(ATOMIC); + } + + /** + * @param atomicityMode Atomicity mode. + * @throws Exception If failed. + */ + private void testNoKeyIntersect(CacheAtomicityMode atomicityMode) throws Exception { + startGrid(0); + + testNoKeyIntersect(atomicityMode, false); + + testNoKeyIntersect(atomicityMode, true); + + startGridsMultiThreaded(1, 4); + + testNoKeyIntersect(atomicityMode, false); + + testNoKeyIntersect(atomicityMode, true); + } + + /** + * @param keys Keys. + * @param rnd Random. + * @return Added key. + */ + private Integer addKey(Set<Integer> keys, ThreadLocalRandom rnd) { + for (;;) { + Integer key = rnd.nextInt(100_000); + + if (keys.add(key)) + return key; + } + } + + /** + * @param atomicityMode Cache atomicity mode. + * @param heapCache On heap cache flag. + * @throws Exception If failed. + */ + private void testNoKeyIntersect(CacheAtomicityMode atomicityMode, boolean heapCache) throws Exception { + Ignite srv0 = ignite(0); + + try { + IgniteCache cache1 = srv0. + createCache(cacheConfiguration(GROUP1, "c1", PARTITIONED, atomicityMode, 1, heapCache)); + + Set<Integer> keys = new LinkedHashSet<>(30); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 10; i++) { + Integer key = addKey(keys, rnd); + + cache1.put(key, key); + cache1.put(new Key1(key), new Value1(key)); + cache1.put(new Key2(key), new Value2(key)); + } + + assertEquals(30, cache1.size()); + + IgniteCache cache2 = srv0. + createCache(cacheConfiguration(GROUP1, "c2", PARTITIONED, atomicityMode, 1, heapCache)); + + assertEquals(30, cache1.size()); + assertEquals(0 , cache2.size()); + + for (Integer key : keys) { + assertNull(cache2.get(key)); + assertNull(cache2.get(new Key1(key))); + assertNull(cache2.get(new Key2(key))); + + cache2.put(key, key + 1); + cache2.put(new Key1(key), new Value1(key + 1)); + cache2.put(new Key2(key), new Value2(key + 1)); + } + + assertEquals(30, cache1.size()); + assertEquals(30, cache2.size()); + + for (int i = 0; i < 10; i++) { + Integer key = addKey(keys, rnd); + + cache2.put(key, key + 1); + cache2.put(new Key1(key), new Value1(key + 1)); + cache2.put(new Key2(key), new Value2(key + 1)); + } + + assertEquals(30, cache1.size()); + assertEquals(60, cache2.size()); + + int i = 0; + + for (Integer key : keys) { + if (i++ < 10) { + assertEquals(key, cache1.get(key)); + assertEquals(new Value1(key), cache1.get(new Key1(key))); + assertEquals(new Value2(key), cache1.get(new Key2(key))); + } + else { + assertNull(cache1.get(key)); + assertNull(cache1.get(new Key1(key))); + assertNull(cache1.get(new Key2(key))); + } + + assertEquals(key + 1, cache2.get(key)); + assertEquals(new Value1(key + 1), cache2.get(new Key1(key))); + assertEquals(new Value2(key + 1), cache2.get(new Key2(key))); + } + + IgniteCache cache3 = srv0. + createCache(cacheConfiguration(GROUP1, "c3", PARTITIONED, atomicityMode, 1, heapCache)); + + assertEquals(30, cache1.size()); + assertEquals(60, cache2.size()); + assertEquals(0, cache3.size()); + + for (Integer key : keys) { + assertNull(cache3.get(key)); + assertNull(cache3.get(new Key1(key))); + assertNull(cache3.get(new Key2(key))); + } + + for (Integer key : keys) { + cache3.put(key, key); + cache3.put(new Key1(key), new Value1(key)); + cache3.put(new Key2(key), new Value2(key)); + } + + i = 0; + + for (Integer key : keys) { + if (i++ < 10) { + assertEquals(key, cache1.get(key)); + assertEquals(new Value1(key), cache1.get(new Key1(key))); + assertEquals(new Value2(key), cache1.get(new Key2(key))); + } + else { + assertNull(cache1.get(key)); + assertNull(cache1.get(new Key1(key))); + assertNull(cache1.get(new Key2(key))); + } + + assertEquals(key + 1, cache2.get(key)); + assertEquals(new Value1(key + 1), cache2.get(new Key1(key))); + assertEquals(new Value2(key + 1), cache2.get(new Key2(key))); + + assertEquals(key, cache3.get(key)); + assertEquals(new Value1(key), cache3.get(new Key1(key))); + assertEquals(new Value2(key), cache3.get(new Key2(key))); + } + + i = 0; + + for (Integer key : keys) { + if (i++ == 3) + break; + + cache1.remove(key); + cache1.remove(new Key1(key)); + cache1.remove(new Key2(key)); + + assertNull(cache1.get(key)); + assertNull(cache1.get(new Key1(key))); + assertNull(cache1.get(new Key2(key))); + + assertEquals(key + 1, cache2.get(key)); + assertEquals(new Value1(key + 1), cache2.get(new Key1(key))); + assertEquals(new Value2(key + 1), cache2.get(new Key2(key))); + + assertEquals(key, cache3.get(key)); + assertEquals(new Value1(key), cache3.get(new Key1(key))); + assertEquals(new Value2(key), cache3.get(new Key2(key))); + } + + cache1.removeAll(); + + for (Integer key : keys) { + assertNull(cache1.get(key)); + assertNull(cache1.get(new Key1(key))); + assertNull(cache1.get(new Key2(key))); + + assertEquals(key + 1, cache2.get(key)); + assertEquals(new Value1(key + 1), cache2.get(new Key1(key))); + assertEquals(new Value2(key + 1), cache2.get(new Key2(key))); + + assertEquals(key, cache3.get(key)); + assertEquals(new Value1(key), cache3.get(new Key1(key))); + assertEquals(new Value2(key), cache3.get(new Key2(key))); + } + + cache2.removeAll(); + + for (Integer key : keys) { + assertNull(cache1.get(key)); + assertNull(cache1.get(new Key1(key))); + assertNull(cache1.get(new Key2(key))); + + assertNull(cache2.get(key)); + assertNull(cache2.get(new Key1(key))); + assertNull(cache2.get(new Key2(key))); + + assertEquals(key, cache3.get(key)); + assertEquals(new Value1(key), cache3.get(new Key1(key))); + assertEquals(new Value2(key), cache3.get(new Key2(key))); + } + + if (atomicityMode == TRANSACTIONAL) + testNoKeyIntersectTxLocks(cache1, cache2); + } + finally { + srv0.destroyCaches(Arrays.asList("c1", "c2", "c3")); + } + } + + /** + * @param cache1 Cache1. + * @param cache2 Cache2. + * @throws Exception If failed. + */ + private void testNoKeyIntersectTxLocks(final IgniteCache cache1, final IgniteCache cache2) throws Exception { + final Ignite node = (Ignite)cache1.unwrap(Ignite.class); + + for (int i = 0; i < 5; i++) { + final Integer key = ThreadLocalRandom.current().nextInt(1000); + + Lock lock = cache1.lock(key); + + lock.lock(); + + try { + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + Lock lock1 = cache1.lock(key); + + assertFalse(lock1.tryLock()); + + Lock lock2 = cache2.lock(key); + + assertTrue(lock2.tryLock()); + + lock2.unlock(); + + return null; + } + }, "lockThread"); + + fut.get(10_000); + } + finally { + lock.unlock(); + } + + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache1.put(key, 1); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache2.put(key, 2); + + tx.commit(); + } + + assertEquals(2, cache2.get(key)); + + return null; + } + }, "txThread"); + + fut.get(10_000); + + tx.commit(); + } + + assertEquals(1, cache1.get(key)); + assertEquals(2, cache2.get(key)); + + try (Transaction tx = node.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = (Integer)cache1.get(key); + + cache1.put(key, val + 10); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable() { + @Override public Object call() throws Exception { + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache2.put(key, 3); + + tx.commit(); + } + + assertEquals(3, cache2.get(key)); + + return null; + } + }, "txThread"); + + fut.get(10_000); + + tx.commit(); + } + + assertEquals(11, cache1.get(key)); + assertEquals(3, cache2.get(key)); + } + } + + /** + * @throws Exception If failed. + */ public void testCacheApiTxPartitioned() throws Exception { cacheApiTest(PARTITIONED, TRANSACTIONAL); } @@ -376,7 +706,7 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void _testConcurrentOperations() throws Exception { + public void _testConcurrentOperationsAndDestroy() throws Exception { final int SRVS = 4; final int CLIENTS = 4; final int NODES = SRVS + CLIENTS; @@ -389,10 +719,15 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { final int CACHES = 8; + final int grp1Backups = ThreadLocalRandom.current().nextInt(3); + final int grp2Backups = ThreadLocalRandom.current().nextInt(3); + for (int i = 0; i < CACHES; i++) { - srv0.createCache(cacheConfiguration(GROUP1, GROUP1 + "-" + i, PARTITIONED, ATOMIC, i, i % 2 == 0)); + srv0.createCache( + cacheConfiguration(GROUP1, GROUP1 + "-" + i, PARTITIONED, ATOMIC, grp1Backups, i % 2 == 0)); - srv0.createCache(cacheConfiguration(GROUP2, GROUP2 + "-" + i, PARTITIONED, TRANSACTIONAL, i, i % 2 == 0)); + srv0.createCache( + cacheConfiguration(GROUP2, GROUP2 + "-" + i, PARTITIONED, TRANSACTIONAL, grp2Backups, i % 2 == 0)); } final AtomicInteger idx = new AtomicInteger(); @@ -436,22 +771,34 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { while (!stop.get()) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); - String grp = rnd.nextBoolean() ? GROUP1 : GROUP2; + String grp; + int backups; + + if (rnd.nextBoolean()) { + grp = GROUP1; + backups = grp1Backups; + } + else { + grp = GROUP2; + backups = grp2Backups; + } Ignite node = ignite(rnd.nextInt(NODES)); + log.info("Create cache [node=" + node.name() + ", grp=" + grp + ']'); + IgniteCache cache = node.createCache(cacheConfiguration(grp, "tmpCache", PARTITIONED, rnd.nextBoolean() ? ATOMIC : TRANSACTIONAL, - rnd.nextInt(3), + backups, rnd.nextBoolean())); for (int i = 0; i < 10; i++) cacheOperation(rnd, cache); - node.destroyCache(cache.getName()); + log.info("Destroy cache [node=" + node.name() + ", grp=" + grp + ']'); - U.sleep(1000); + node.destroyCache(cache.getName()); } } catch (Exception e) { @@ -517,7 +864,7 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { * @param cache Cache. */ private void cacheOperation(ThreadLocalRandom rnd, IgniteCache cache) { - Object key = cache.getName() + rnd.nextInt(1000); + Object key = rnd.nextInt(1000); cache.put(key, 1); } @@ -589,6 +936,72 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { } /** + * + */ + static class Value1 implements Serializable { + /** */ + private int val; + + /** + * @param val Value. + */ + public Value1(int val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + Value1 val1 = (Value1)o; + + return val == val1.val; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val; + } + } + + /** + * + */ + static class Value2 implements Serializable { + /** */ + private int val; + + /** + * @param val Value. + */ + public Value2(int val) { + this.val = val; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + Value2 val2 = (Value2)o; + + return val == val2.val; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return val; + } + } + + /** * @param grpName Cache group name. * @param name Cache name. * @param cacheMode Cache mode.
