Repository: ignite Updated Branches: refs/heads/ignite-5075 b5eab9692 -> bf4bc78c6
ignite-5075 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bf4bc78c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bf4bc78c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bf4bc78c Branch: refs/heads/ignite-5075 Commit: bf4bc78c63d295c68a6e2b4a7f26bab280ddd91d Parents: b5eab96 Author: sboikov <[email protected]> Authored: Mon May 22 11:18:58 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon May 22 11:18:58 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheGroupsTest.java | 402 ++++++++++++++++++- 1 file changed, 397 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/bf4bc78c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java index f1b5345..ee76c6e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java @@ -18,38 +18,49 @@ package org.apache.ignite.internal.processors.cache; import java.io.Serializable; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.TreeSet; import java.util.ArrayList; import java.util.Arrays; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import javax.cache.Cache; import java.util.concurrent.locks.Lock; +import javax.cache.Cache; import javax.cache.CacheException; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.Ignition; +import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheExistsException; +import org.apache.ignite.cache.CacheInterceptor; +import org.apache.ignite.cache.CacheInterceptorAdapter; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.affinity.AffinityKeyMapper; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.lang.GridPlainCallable; @@ -1819,6 +1830,387 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest { } /** + * @return Cache configurations. + */ + private CacheConfiguration[] interceptorConfigurations() { + CacheConfiguration[] ccfgs = new CacheConfiguration[6]; + + ccfgs[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 2, false).setInterceptor(new Interceptor1()); + ccfgs[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false).setInterceptor(new Interceptor2()); + ccfgs[2] = cacheConfiguration(GROUP1, "c3", PARTITIONED, TRANSACTIONAL, 2, false).setInterceptor(new Interceptor1()); + ccfgs[3] = cacheConfiguration(GROUP1, "c4", PARTITIONED, TRANSACTIONAL, 2, false).setInterceptor(new Interceptor2()); + ccfgs[4] = cacheConfiguration(GROUP1, "c5", PARTITIONED, ATOMIC, 2, false); + ccfgs[5] = cacheConfiguration(GROUP1, "c6", PARTITIONED, TRANSACTIONAL, 2, false); + + return ccfgs; + } + + /** + * Tests caches in the same group with different {@link CacheInterceptor}s. + * + * @throws Exception If failed. + */ + public void testInterceptors() throws Exception { + for (int i = 0; i < 4; i++) { + ccfgs = interceptorConfigurations(); + + startGrid(i); + } + + Ignite node = ignite(0); + + checkInterceptorPut(node.cache("c1"), "v1"); + checkInterceptorPut(node.cache("c2"), "v2"); + checkInterceptorPut(node.cache("c3"), "v1"); + checkInterceptorPut(node.cache("c4"), "v2"); + + checkCache(0, "c5", 10); + checkCache(0, "c6", 10); + } + + /** + * @param cache Cache. + * @param expVal Expected value. + */ + private void checkInterceptorPut(IgniteCache cache, String expVal) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 10; i++) { + Integer key = rnd.nextInt(); + + cache.put(key, i); + + assertEquals(expVal, cache.get(key)); + } + } + + /** + * @return Cache configurations. + */ + private CacheConfiguration[] cacheStoreConfigurations() { + CacheConfiguration[] ccfgs = new CacheConfiguration[6]; + + ccfgs[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 2, false). + setCacheStoreFactory(new StoreFactory1()).setReadThrough(true).setWriteThrough(true); + + ccfgs[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false). + setCacheStoreFactory(new StoreFactory2()).setReadThrough(true).setWriteThrough(true); + + ccfgs[2] = cacheConfiguration(GROUP1, "c3", PARTITIONED, TRANSACTIONAL, 2, false). + setCacheStoreFactory(new StoreFactory1()).setReadThrough(true).setWriteThrough(true); + + ccfgs[3] = cacheConfiguration(GROUP1, "c4", PARTITIONED, TRANSACTIONAL, 2, false). + setCacheStoreFactory(new StoreFactory2()).setReadThrough(true).setWriteThrough(true); + + ccfgs[4] = cacheConfiguration(GROUP1, "c5", PARTITIONED, ATOMIC, 2, false); + ccfgs[5] = cacheConfiguration(GROUP1, "c6", PARTITIONED, TRANSACTIONAL, 2, false); + + return ccfgs; + } + + /** + * Tests caches in the same group with different {@link CacheStore}s. + * + * @throws Exception If failed. + */ + public void testCacheStores() throws Exception { + for (int i = 0; i < 4; i++) { + ccfgs = cacheStoreConfigurations(); + + startGrid(i); + } + + Ignite node = ignite(0); + + checkStorePut(node.cache("c1"), Store1.map); + assertTrue(Store2.map.isEmpty()); + + checkStorePut(node.cache("c3"), Store1.map); + assertTrue(Store2.map.isEmpty()); + + Store1.map.clear(); + + checkStorePut(node.cache("c2"), Store2.map); + assertTrue(Store1.map.isEmpty()); + + checkStorePut(node.cache("c4"), Store2.map); + assertTrue(Store1.map.isEmpty()); + + Store2.map.clear(); + + checkCache(0, "c5", 10); + checkCache(0, "c6", 10); + + assertTrue(Store1.map.isEmpty()); + assertTrue(Store2.map.isEmpty()); + } + + /** + * @param cache Cache. + * @param storeMap Cache store data. + */ + private void checkStorePut(IgniteCache cache, ConcurrentHashMap<Object, Object> storeMap) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < 10; i++) { + Integer key = rnd.nextInt(); + + storeMap.put(key, i); + + assertEquals(i, cache.get(key)); + + cache.put(key, 10_000); + + assertEquals(10_000, cache.get(key)); + assertEquals(10_000, storeMap.get(key)); + } + } + + /** + * @return Cache configurations. + */ + private CacheConfiguration[] mapperConfigurations() { + CacheConfiguration[] ccfgs = new CacheConfiguration[6]; + + ccfgs[0] = cacheConfiguration(GROUP1, "c1", PARTITIONED, ATOMIC, 2, false).setAffinityMapper(new Mapper1()); + ccfgs[1] = cacheConfiguration(GROUP1, "c2", PARTITIONED, ATOMIC, 2, false).setAffinityMapper(new Mapper2()); + ccfgs[2] = cacheConfiguration(GROUP1, "c3", PARTITIONED, TRANSACTIONAL, 2, false).setAffinityMapper(new Mapper1()); + ccfgs[3] = cacheConfiguration(GROUP1, "c4", PARTITIONED, TRANSACTIONAL, 2, false).setAffinityMapper(new Mapper2()); + ccfgs[4] = cacheConfiguration(GROUP1, "c5", PARTITIONED, ATOMIC, 2, false); + ccfgs[5] = cacheConfiguration(GROUP1, "c6", PARTITIONED, TRANSACTIONAL, 2, false); + + return ccfgs; + } + + /** + * @throws Exception If failed. + */ + public void testAffinityMappers() throws Exception { + for (int i = 0; i < 4; i++) { + ccfgs = mapperConfigurations(); + + startGrid(i); + } + + for (int i = 0; i < 4; i++) + checkAffinityMappers(ignite(i)); + + client = true; + + startGrid(4); + + checkAffinityMappers(ignite(4)); + + for (int i = 0; i < 5; i++) { + checkCache(i, "c1", 10); + checkCache(i, "c2", 10); + checkCache(i, "c3", 10); + checkCache(i, "c4", 10); + checkCache(i, "c5", 10); + checkCache(i, "c6", 10); + } + } + + /** + * @param node Node. + */ + private void checkAffinityMappers(Ignite node) { + Affinity aff1 = node.affinity("c1"); + Affinity aff2 = node.affinity("c2"); + Affinity aff3 = node.affinity("c3"); + Affinity aff4 = node.affinity("c4"); + Affinity aff5 = node.affinity("c5"); + Affinity aff6 = node.affinity("c6"); + + RendezvousAffinityFunction func = new RendezvousAffinityFunction(); + + for (int i = 0; i < 100; i++) { + MapperTestKey1 k = new MapperTestKey1(i, i + 10); + + assertEquals(i, aff1.partition(k)); + assertEquals(i, aff3.partition(k)); + assertEquals(i + 10, aff2.partition(k)); + assertEquals(i + 10, aff4.partition(k)); + + int part; + + if (node.configuration().getMarshaller() instanceof BinaryMarshaller) + part = func.partition(node.binary().toBinary(k)); + else + part = func.partition(k); + + assertEquals(part, aff5.partition(k)); + assertEquals(part, aff6.partition(k)); + } + } + + /** + * + */ + static class Mapper1 implements AffinityKeyMapper { + /** {@inheritDoc} */ + @Override public Object affinityKey(Object key) { + if (key instanceof MapperTestKey1) + return ((MapperTestKey1)key).p1; + else if (key instanceof BinaryObject) + ((BinaryObject) key).field("p1"); + + return key; + } + + /** {@inheritDoc} */ + @Override public void reset() { + // No-op. + } + } + + /** + * + */ + static class Mapper2 implements AffinityKeyMapper { + /** {@inheritDoc} */ + @Override public Object affinityKey(Object key) { + if (key instanceof MapperTestKey1) + return ((MapperTestKey1)key).p2; + else if (key instanceof BinaryObject) + ((BinaryObject) key).field("p2"); + + return key; + } + + /** {@inheritDoc} */ + @Override public void reset() { + // No-op. + } + } + + /** + * + */ + static class MapperTestKey1 { + /** */ + final int p1; + + /** */ + final int p2; + + /** + * @param p1 Field1. + * @param p2 Field2. + */ + public MapperTestKey1(int p1, int p2) { + this.p1 = p1; + this.p2 = p2; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + MapperTestKey1 testKey1 = (MapperTestKey1)o; + + return p1 == testKey1.p1 && p2 == testKey1.p2; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = p1; + res = 31 * res + p2; + return res; + } + } + + /** + * + */ + static class Interceptor1 extends CacheInterceptorAdapter<Object, Object> { + /** {@inheritDoc} */ + @Override public Object onBeforePut(Cache.Entry<Object, Object> entry, Object newVal) { + return "v1"; + } + } + + /** + * + */ + static class Interceptor2 extends CacheInterceptorAdapter<Object, Object> { + /** {@inheritDoc} */ + @Override public Object onBeforePut(Cache.Entry<Object, Object> entry, Object newVal) { + return "v2"; + } + } + + /** + * + */ + static class StoreFactory1 implements Factory<CacheStore> { + /** {@inheritDoc} */ + @Override public CacheStore create() { + return new Store1(); + } + } + + /** + * + */ + static class Store1 extends CacheStoreAdapter { + /** */ + static ConcurrentHashMap<Object, Object> map = new ConcurrentHashMap<>(); + + /** {@inheritDoc} */ + @Override public Object load(Object key) throws CacheLoaderException { + return map.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry entry) throws CacheWriterException { + map.put(entry.getKey(), entry.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + map.remove(key); + } + } + + /** + * + */ + static class StoreFactory2 implements Factory<CacheStore> { + /** {@inheritDoc} */ + @Override public CacheStore create() { + return new Store2(); + } + } + + /** + * + */ + static class Store2 extends CacheStoreAdapter { + /** */ + static ConcurrentHashMap<Object, Object> map = new ConcurrentHashMap<>(); + + /** {@inheritDoc} */ + @Override public Object load(Object key) throws CacheLoaderException { + return map.get(key); + } + + /** {@inheritDoc} */ + @Override public void write(Cache.Entry entry) throws CacheWriterException { + map.put(entry.getKey(), entry.getValue()); + } + + /** {@inheritDoc} */ + @Override public void delete(Object key) throws CacheWriterException { + map.remove(key); + } + } + + /** * @param rnd Random. * @param cache Cache. */
