http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java index abfc325..06cbe79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java @@ -71,6 +71,7 @@ import org.apache.ignite.internal.util.GridSpinBusyLock; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.GridCloseableIterator; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.A; @@ -1119,35 +1120,41 @@ public class GridServiceProcessor extends GridProcessorAdapter { */ @SuppressWarnings("unchecked") private Iterator<Cache.Entry<Object, Object>> serviceEntries(IgniteBiPredicate<Object, Object> p) { - if (!cache.context().affinityNode()) { - ClusterNode oldestSrvNode = - CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE); + try { + if (!cache.context().affinityNode()) { + ClusterNode oldestSrvNode = + CU.oldestAliveCacheServerNode(cache.context().shared(), AffinityTopologyVersion.NONE); - if (oldestSrvNode == null) - return new GridEmptyIterator<>(); + if (oldestSrvNode == null) + return new GridEmptyIterator<>(); - GridCacheQueryManager qryMgr = cache.context().queries(); + GridCacheQueryManager qryMgr = cache.context().queries(); - CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p, null, false); + CacheQuery<Map.Entry<Object, Object>> qry = qryMgr.createScanQuery(p, null, false); - qry.keepAll(false); + qry.keepAll(false); - qry.projection(ctx.cluster().get().forNode(oldestSrvNode)); + qry.projection(ctx.cluster().get().forNode(oldestSrvNode)); - return cache.context().itHolder().iterator(qry.execute(), - new CacheIteratorConverter<Object, Map.Entry<Object,Object>>() { - @Override protected Object convert(Map.Entry<Object, Object> e) { - return new CacheEntryImpl<>(e.getKey(), e.getValue()); - } + GridCloseableIterator<Map.Entry<Object, Object>> iter = qry.executeScanQuery(); - @Override protected void remove(Object item) { - throw new UnsupportedOperationException(); - } - } - ); + return cache.context().itHolder().iterator(iter, + new CacheIteratorConverter<Cache.Entry<Object, Object>, Map.Entry<Object,Object>>() { + @Override protected Cache.Entry<Object, Object> convert(Map.Entry<Object, Object> e) { + return new CacheEntryImpl<>(e.getKey(), e.getValue()); + } + + @Override protected void remove(Cache.Entry<Object, Object> item) { + throw new UnsupportedOperationException(); + } + }); + } + else + return cache.entrySetx().iterator(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); } - else - return cache.entrySetx().iterator(); } /**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java index e434b49..0d45324 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedPreloadLifecycleSelfTest.java @@ -17,19 +17,12 @@ package org.apache.ignite.internal.processors.cache.distributed.near; -import java.util.Map; -import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.GridCachePreloadLifecycleAbstractTest; -import org.apache.ignite.internal.processors.cache.query.CacheQuery; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lifecycle.LifecycleBean; import org.apache.ignite.lifecycle.LifecycleEventType; import org.apache.ignite.resources.IgniteInstanceResource; @@ -164,34 +157,6 @@ public class GridCachePartitionedPreloadLifecycleSelfTest extends GridCachePrelo } /** - * @param keys Keys. - * @throws Exception If failed. - */ - public void checkScanQuery(Object[] keys) throws Exception { - preloadMode = SYNC; - - lifecycleBean = lifecycleBean(keys); - - for (int i = 0; i < gridCnt; i++) { - startGrid(i); - - info("Checking '" + (i + 1) + "' nodes..."); - - for (int j = 0; j < G.allGrids().size(); j++) { - GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two"); - - CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, null, false); - - int totalCnt = F.sumInt(qry.execute(new EntryIntegerIgniteReducer()).get()); - - info("Total entry count [grid=" + j + ", totalCnt=" + totalCnt + ']'); - - assertEquals(keys.length / 2, totalCnt); - } - } - } - - /** * @throws Exception If failed. */ public void testLifecycleBean1() throws Exception { @@ -218,69 +183,4 @@ public class GridCachePartitionedPreloadLifecycleSelfTest extends GridCachePrelo public void testLifecycleBean4() throws Exception { checkCache(keys(false, 500)); } - - /** - * @throws Exception If failed. - */ - public void testScanQuery1() throws Exception { - checkScanQuery(keys(true, DFLT_KEYS.length, DFLT_KEYS)); - } - - /** - * @throws Exception If failed. - */ - public void testScanQuery2() throws Exception { - checkScanQuery(keys(false, DFLT_KEYS.length, DFLT_KEYS)); - } - - /** - * @throws Exception If failed. - */ - public void testScanQuery3() throws Exception { - checkScanQuery(keys(true, 500)); - } - - /** - * @throws Exception If failed. - */ - public void testScanQuery4() throws Exception { - checkScanQuery(keys(false, 500)); - } - - /** - * - */ - private static class EntryIntegerIgniteReducer implements IgniteReducer<Map.Entry<Object, MyValue>, Integer> { - @IgniteInstanceResource - private Ignite grid; - - private int cnt; - - @Override public boolean collect(Map.Entry<Object, MyValue> e) { - Object key = e.getKey(); - - assertNotNull(e.getValue()); - - try { - Object v1 = e.getValue(); - Object v2 = grid.cache("one").get(key); - - assertNotNull(v2); - assertEquals(v1, v2); - } - catch (CacheException e1) { - e1.printStackTrace(); - - assert false; - } - - cnt++; - - return true; - } - - @Override public Integer reduce() { - return cnt; - } - } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java index 6a7a68b..f6799d8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/preloader/GridCacheReplicatedPreloadLifecycleSelfTest.java @@ -17,26 +17,17 @@ package org.apache.ignite.internal.processors.cache.distributed.replicated.preloader; -import java.util.Map; import javax.cache.Cache; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.GridCachePreloadLifecycleAbstractTest; -import org.apache.ignite.internal.processors.cache.query.CacheQuery; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; -import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lifecycle.LifecycleBean; import org.apache.ignite.lifecycle.LifecycleEventType; import org.apache.ignite.resources.IgniteInstanceResource; -import org.apache.ignite.resources.LoggerResource; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; @@ -167,40 +158,6 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa } } - - /** - * @param keys Keys. - * @throws Exception If failed. - */ - public void checkScanQuery(Object[] keys) throws Exception { - preloadMode = SYNC; - - lifecycleBean = lifecycleBean(keys); - - for (int i = 0; i < gridCnt; i++) { - startGrid(i); - - info("Checking '" + (i + 1) + "' nodes..."); - - for (int j = 0; j < G.allGrids().size(); j++) { - GridCacheAdapter<Object, MyValue> c2 = ((IgniteKernal)grid(j)).internalCache("two"); - - CacheQuery<Map.Entry<Object, MyValue>> qry = c2.context().queries().createScanQuery(null, null, false); - - final int i0 = j; - final int j0 = i; - - qry = qry.projection(grid(j).cluster()); - - int totalCnt = F.sumInt(qry.execute(new EntryReducer(j0, i0)).get()); - - info("Total entry count [grid=" + j + ", totalCnt=" + totalCnt + ']'); - - assertEquals(keys.length * (i + 1) / 2, totalCnt); - } - } - } - /** * @throws Exception If failed. */ @@ -228,91 +185,4 @@ public class GridCacheReplicatedPreloadLifecycleSelfTest extends GridCachePreloa public void testLifecycleBean4() throws Exception { checkCache(keys(false, 500)); } - - /** - * @throws Exception If failed. - */ - public void testScanQuery1() throws Exception { - checkScanQuery(keys(true, DFLT_KEYS.length, DFLT_KEYS)); - } - - /** - * @throws Exception If failed. - */ - public void testScanQuery2() throws Exception { - checkScanQuery(keys(false, DFLT_KEYS.length, DFLT_KEYS)); - } - - /** - * @throws Exception If failed. - */ - public void testScanQuery3() throws Exception { - checkScanQuery(keys(true, 500)); - } - - /** - * @throws Exception If failed. - */ - public void testScanQuery4() throws Exception { - checkScanQuery(keys(false, 500)); - } - - private static class EntryReducer implements IgniteReducer<Map.Entry<Object, MyValue>, Integer> { - /** */ - private final int j0; - - /** */ - private final int i0; - - /** */ - @IgniteInstanceResource - private Ignite grid; - - /** */ - @LoggerResource - private IgniteLogger log0; - - /** */ - private int cnt; - - /** - */ - public EntryReducer(int j0, int i0) { - this.j0 = j0; - this.i0 = i0; - } - - /** {@inheritDoc} */ - @Override public boolean collect(Map.Entry<Object, MyValue> e) { - if (!quiet && log0.isInfoEnabled()) - log0.info("Collecting entry: " + e); - - Object key = e.getKey(); - - assertNotNull(e.getValue()); - - try { - Object v1 = e.getValue(); - Object v2 = ((IgniteKernal)grid).getCache("one").get(key); - - assertNotNull("Cache c1 misses value for key [i=" + j0 + ", j=" + i0 + ", missedKey=" + - key + ", cache=" + ((IgniteKernal)grid).getCache("one").values() + ']', v2); - assertEquals(v1, v2); - } - catch (IgniteCheckedException e1) { - e1.printStackTrace(); - - assert false; - } - - cnt++; - - return true; - } - - /** {@inheritDoc} */ - @Override public Integer reduce() { - return cnt; - } - } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariationsTestSuiteBuilder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariationsTestSuiteBuilder.java b/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariationsTestSuiteBuilder.java index 5c88f4e..b20cb0e 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariationsTestSuiteBuilder.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/configvariations/ConfigVariationsTestSuiteBuilder.java @@ -197,9 +197,9 @@ public class ConfigVariationsTestSuiteBuilder { TestSuite addedSuite; if (testedNodeCnt > 1) - addedSuite = createMultiNodeTestSuite((Class<? extends IgniteCacheConfigVariationsAbstractTest>)cls, + addedSuite = createMultiNodeTestSuite((Class<? extends IgniteCacheConfigVariationsAbstractTest>)cls, testCfg, testedNodeCnt, withClients, skipWaitPartMapExchange); - else + else addedSuite = new IgniteConfigVariationsTestSuite(cls, testCfg); return addedSuite; http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteCacheConfigVariationsAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteCacheConfigVariationsAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteCacheConfigVariationsAbstractTest.java index acf4a05..6b52578 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteCacheConfigVariationsAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteCacheConfigVariationsAbstractTest.java @@ -132,7 +132,7 @@ public abstract class IgniteCacheConfigVariationsAbstractTest extends IgniteConf info(">>> Starting set of tests [testedNodeIdx=" + testedNodeIdx + ", id=" + grid(testedNodeIdx).localNode().id() - + ", isClient=" + grid(testedNodeIdx).configuration().isClientMode() + + ", isClient=" + isClientMode() + ", nearEnabled=" + testedNodeNearEnabled + "]"); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java index 3370421..d70b606 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/IgniteConfigVariationsAbstractTest.java @@ -58,7 +58,7 @@ public abstract class IgniteConfigVariationsAbstractTest extends GridCommonAbstr protected VariationsTestsConfig testsCfg; /** */ - protected volatile DataMode dataMode; + protected volatile DataMode dataMode = DataMode.PLANE_OBJECT; /** * @param testsCfg Tests configuration. @@ -198,6 +198,27 @@ public abstract class IgniteConfigVariationsAbstractTest extends GridCommonAbstr } /** + * @return Tested grid in client mode or not. + */ + protected boolean isClientMode() { + return grid(testedNodeIdx).configuration().isClientMode(); + } + + /** + * @return Count of server nodes at topology. + */ + protected int serversGridCount() { + int cnt = 0; + + for (int i = 0; i < gridCount(); i++) { + if (!grid(i).configuration().isClientMode()) + cnt++; + } + + return cnt; + } + + /** * Runs in all data modes. */ protected void runInAllDataModes(TestRunnable call) throws Exception { @@ -228,6 +249,7 @@ public abstract class IgniteConfigVariationsAbstractTest extends GridCommonAbstr /** * @param keyId Key Id. * @return Key. + * @see #valueOf(Object) */ public Object key(int keyId) { return key(keyId, dataMode); @@ -236,6 +258,7 @@ public abstract class IgniteConfigVariationsAbstractTest extends GridCommonAbstr /** * @param valId Key Id. * @return Value. + * @see #valueOf(Object) */ public Object value(int valId) { return value(valId, dataMode); @@ -272,7 +295,7 @@ public abstract class IgniteConfigVariationsAbstractTest extends GridCommonAbstr if (obj instanceof TestObject) return ((TestObject)obj).value(); else - throw new IllegalStateException(); + throw new IllegalArgumentException("Unknown tested object type: " + obj); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/indexing/pom.xml ---------------------------------------------------------------------- diff --git a/modules/indexing/pom.xml b/modules/indexing/pom.xml index 22a52b2..899d4a7 100644 --- a/modules/indexing/pom.xml +++ b/modules/indexing/pom.xml @@ -68,6 +68,13 @@ </dependency> <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>2.4</version> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java index 28eef90..d2d8c4d 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheAbstractQueryMetricsSelfTest.java @@ -84,6 +84,7 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra cacheCfg1.setCacheMode(cacheMode); cacheCfg1.setWriteSynchronizationMode(FULL_SYNC); cacheCfg1.setIndexedTypes(String.class, Integer.class); + cacheCfg1.setStatisticsEnabled(true); CacheConfiguration<String, Integer> cacheCfg2 = defaultCacheConfiguration(); @@ -91,6 +92,7 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra cacheCfg2.setCacheMode(cacheMode); cacheCfg2.setWriteSynchronizationMode(FULL_SYNC); cacheCfg2.setIndexedTypes(String.class, Integer.class); + cacheCfg2.setStatisticsEnabled(true); cfg.setCacheConfiguration(cacheCfg1, cacheCfg2); @@ -347,4 +349,4 @@ public abstract class CacheAbstractQueryMetricsSelfTest extends GridCommonAbstra } }, 5000); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java index 2b2020d..65d479d 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractQuerySelfTest.java @@ -867,7 +867,13 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac public void testScanQuery() throws Exception { IgniteCache<Integer, String> c1 = ignite().cache(null); - c1.put(777, "value"); + Map<Integer, String> map = new HashMap(){{ + for (int i = 0; i < 5000; i++) + put(i, "str" + i); + }}; + + for (Map.Entry<Integer, String> e : map.entrySet()) + c1.put(e.getKey(), e.getValue()); // Scan query. QueryCursor<Cache.Entry<Integer, String>> qry = c1.query(new ScanQuery<Integer, String>()); @@ -876,16 +882,21 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac assert iter != null; - int expCnt = 1; + int cnt = 0; + + while (iter.hasNext()) { + Cache.Entry<Integer, String> e = iter.next(); + + String expVal = map.get(e.getKey()); + + assertNotNull(expVal); - for (int i = 0; i < expCnt; i++) { - Cache.Entry<Integer, String> e1 = iter.next(); + assertEquals(expVal, e.getValue()); - assertEquals(777, e1.getKey().intValue()); - assertEquals("value", e1.getValue()); + cnt++; } - assert !iter.hasNext(); + assertEquals(map.size(), cnt); } /** @@ -1308,13 +1319,6 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac * @throws Exception If failed. */ public void testScanQueryEvents() throws Exception { - checkScanQueryEvents(); - } - - /** - * @throws Exception If failed. - */ - private void checkScanQueryEvents() throws Exception { final Map<Integer, Integer> map = new ConcurrentHashMap8<>(); final CountDownLatch latch = new CountDownLatch(10); final CountDownLatch execLatch = new CountDownLatch(cacheMode() == REPLICATED ? 1 : gridCount()); @@ -1980,4 +1984,4 @@ public abstract class IgniteCacheAbstractQuerySelfTest extends GridCommonAbstrac /** */ TYPE_B } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsQueryTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsQueryTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsQueryTest.java new file mode 100644 index 0000000..4e6af25 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsQueryTest.java @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import javax.cache.Cache; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.query.ScanQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.CacheQueryExecutedEvent; +import org.apache.ignite.events.CacheQueryReadEvent; +import org.apache.ignite.events.Event; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.testframework.junits.IgniteCacheConfigVariationsAbstractTest; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_EXECUTED; +import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ; +import static org.apache.ignite.internal.processors.cache.query.CacheQueryType.SCAN; + +/** + * Config Variations query tests. + */ +public class IgniteCacheConfigVariationsQueryTest extends IgniteCacheConfigVariationsAbstractTest { + /** */ + public static final int CNT = 50; + + /** */ + private Map<Object, Object> evtMap; + + /** */ + private CountDownLatch readEvtLatch; + + /** */ + private CountDownLatch execEvtLatch; + + /** */ + private IgnitePredicate[] objReadLsnrs; + + /** */ + private IgnitePredicate[] qryExecLsnrs; + + /** */ + private Map<Object, Object> expMap; + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("serial") + public void testScanQuery() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + try { + IgniteCache<Object, Object> cache = jcache(); + + Map<Object, Object> map = new HashMap<Object, Object>() {{ + for (int i = 0; i < CNT; i++) + put(key(i), value(i)); + }}; + + registerEventListeners(map); + + for (Map.Entry<Object, Object> e : map.entrySet()) + cache.put(e.getKey(), e.getValue()); + + // Scan query. + QueryCursor<Cache.Entry<Object, Object>> qry = cache.query(new ScanQuery()); + + checkQueryResults(map, qry); + } + finally { + stopListeners(); + } + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testScanPartitionQuery() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + IgniteCache<Object, Object> cache = jcache(); + + GridCacheContext cctx = ((IgniteCacheProxy)cache).context(); + + Map<Integer, Map<Object, Object>> entries = new HashMap<>(); + + for (int i = 0; i < CNT; i++) { + Object key = key(i); + Object val = value(i); + + cache.put(key, val); + + int part = cctx.affinity().partition(key); + + Map<Object, Object> partEntries = entries.get(part); + + if (partEntries == null) + entries.put(part, partEntries = new HashMap<>()); + + partEntries.put(key, val); + } + + for (int i = 0; i < cctx.affinity().partitions(); i++) { + try { + Map<Object, Object> exp = entries.get(i); + + if (exp == null) + System.out.println(); + + registerEventListeners(exp); + + ScanQuery<Object, Object> scan = new ScanQuery<>(i); + + Collection<Cache.Entry<Object, Object>> actual = cache.query(scan).getAll(); + + assertEquals("Failed for partition: " + i, exp == null ? 0 : exp.size(), actual.size()); + + if (exp != null) { + for (Cache.Entry<Object, Object> entry : actual) + assertTrue(entry.getValue().equals(exp.get(entry.getKey()))); + } + + checkEvents(); + } + finally { + stopListeners(); + } + } + } + }); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("SubtractionInCompareTo") + public void testScanFilters() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + try { + IgniteCache<Object, Object> cache = jcache(); + + IgniteBiPredicate<Object, Object> p = new IgniteBiPredicate<Object, Object>() { + @Override public boolean apply(Object k, Object v) { + assertNotNull(k); + assertNotNull(v); + + return valueOf(k) >= 20 && valueOf(v) < 40; + } + }; + + Map<Object, Object> exp = new HashMap<>(); + + for (int i = 0; i < CNT; i++) { + Object key = key(i); + Object val = value(i); + + cache.put(key, val); + + if (p.apply(key, val)) + exp.put(key, val); + } + + registerEventListeners(exp, true); + + QueryCursor<Cache.Entry<Object, Object>> q = cache.query(new ScanQuery<>(p)); + + checkQueryResults(exp, q); + } + finally { + stopListeners(); + } + } + }); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("SubtractionInCompareTo") + public void testLocalScanQuery() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + try { + IgniteCache<Object, Object> cache = jcache(); + + ClusterNode locNode = testedGrid().cluster().localNode(); + Affinity<Object> affinity = testedGrid().affinity(cacheName()); + + Map<Object, Object> map = new HashMap<>(); + + for (int i = 0; i < CNT; i++) { + Object key = key(i); + Object val = value(i); + + cache.put(key, val); + + if (!isClientMode() && (cacheMode() == REPLICATED || affinity.isPrimary(locNode, key))) + map.put(key, val); + } + + registerEventListeners(map); + + QueryCursor<Cache.Entry<Object, Object>> q = cache.query(new ScanQuery<>().setLocal(true)); + + checkQueryResults(map, q); + } + finally { + stopListeners(); + } + } + }); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("SubtractionInCompareTo") + public void testScanQueryLocalFilter() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + try { + IgniteCache<Object, Object> cache = jcache(); + + ClusterNode locNode = testedGrid().cluster().localNode(); + + Map<Object, Object> map = new HashMap<>(); + + IgniteBiPredicate<Object, Object> filter = new IgniteBiPredicate<Object, Object>() { + @Override public boolean apply(Object k, Object v) { + assertNotNull(k); + assertNotNull(v); + + return valueOf(k) >= 20 && valueOf(v) < 40; + } + }; + + for (int i = 0; i < CNT; i++) { + Object key = key(i); + Object val = value(i); + + cache.put(key, val); + + if (!isClientMode() && (cacheMode() == REPLICATED + || testedGrid().affinity(cacheName()).isPrimary(locNode, key)) && filter.apply(key, val)) + map.put(key, val); + } + + registerEventListeners(map, true); + + QueryCursor<Cache.Entry<Object, Object>> q = cache.query(new ScanQuery<>(filter).setLocal(true)); + + checkQueryResults(map, q); + } + finally { + stopListeners(); + } + } + }); + } + + /** + * @throws Exception If failed. + */ + @SuppressWarnings("SubtractionInCompareTo") + public void testScanQueryPartitionFilter() throws Exception { + runInAllDataModes(new TestRunnable() { + @Override public void run() throws Exception { + IgniteCache<Object, Object> cache = jcache(); + + Affinity<Object> affinity = testedGrid().affinity(cacheName()); + + Map<Integer, Map<Object, Object>> partMap = new HashMap<>(); + + IgniteBiPredicate<Object, Object> filter = new IgniteBiPredicate<Object, Object>() { + @Override public boolean apply(Object k, Object v) { + assertNotNull(k); + assertNotNull(v); + + return valueOf(k) >= 20 && valueOf(v) < 40; + } + }; + + for (int i = 0; i < CNT; i++) { + Object key = key(i); + Object val = value(i); + + cache.put(key, val); + + if (filter.apply(key, val)) { + int part = affinity.partition(key); + + Map<Object, Object> map = partMap.get(part); + + if (map == null) + partMap.put(part, map = new HashMap<>()); + + map.put(key, val); + } + } + + for (int part = 0; part < affinity.partitions(); part++) { + try { + Map<Object, Object> expMap = partMap.get(part); + + expMap = expMap == null ? Collections.emptyMap() : expMap; + + registerEventListeners(expMap, true); + + QueryCursor<Cache.Entry<Object, Object>> q = cache.query(new ScanQuery<>(part, filter)); + + checkQueryResults(expMap, q); + } + finally { + stopListeners(); + } + } + } + }); + } + + /** + * @param expMap Expected map. + * @param cursor Query cursor. + */ + private void checkQueryResults(Map<Object, Object> expMap, QueryCursor<Cache.Entry<Object, Object>> cursor) + throws InterruptedException { + Iterator<Cache.Entry<Object, Object>> iter = cursor.iterator(); + + try { + assertNotNull(iter); + + int cnt = 0; + + while (iter.hasNext()) { + Cache.Entry<Object, Object> e = iter.next(); + + assertNotNull(e.getKey()); + assertNotNull(e.getValue()); + + Object expVal = expMap.get(e.getKey()); + + assertNotNull("Failed to resolve expected value for key: " + e.getKey(), expVal); + + assertEquals(expVal, e.getValue()); + + cnt++; + } + + assertEquals(expMap.size(), cnt); + } + finally { + cursor.close(); + } + + checkEvents(); + } + + /** + * Registers event listeners. + * @param expMap Expected read events count. + */ + private void registerEventListeners(Map<Object, Object> expMap) { + registerEventListeners(expMap, false); + } + + /** + * Registers event listeners. + * @param expMap Expected read events count. + * @param filterExp Scan query uses filter. + */ + private void registerEventListeners(Map<Object, Object> expMap, final boolean filterExp) { + this.expMap = expMap != null ? expMap : Collections.emptyMap(); + + Set<ClusterNode> affNodes= new HashSet<>(); + + if (cacheMode() != REPLICATED) { + Affinity<Object> aff = testedGrid().affinity(cacheName()); + + for (Object key : this.expMap.keySet()) + affNodes.add(aff.mapKeyToNode(key)); + } + + int execEvtCnt = cacheMode() == REPLICATED || (cacheMode() == PARTITIONED && affNodes.isEmpty()) ? 1 : affNodes.size(); + + evtMap = new ConcurrentHashMap<>(); + readEvtLatch = new CountDownLatch(this.expMap.size()); + execEvtLatch = new CountDownLatch(execEvtCnt); + + objReadLsnrs = new IgnitePredicate[gridCount()]; + qryExecLsnrs = new IgnitePredicate[gridCount()]; + + for (int i = 0; i < gridCount(); i++) { + IgnitePredicate<Event> pred = new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + assertTrue("Event: " + evt, evt instanceof CacheQueryReadEvent); + + CacheQueryReadEvent<Object, Object> qe = (CacheQueryReadEvent<Object, Object>)evt; + + assertEquals(SCAN.name(), qe.queryType()); + assertEquals(cacheName(), qe.cacheName()); + + assertNull(qe.className()); + assertNull(qe.clause()); + assertEquals(filterExp, qe.scanQueryFilter() != null); + assertNull(qe.continuousQueryFilter()); + assertNull(qe.arguments()); + + evtMap.put(qe.key(), qe.value()); + + assertFalse(readEvtLatch.getCount() == 0); + + readEvtLatch.countDown(); + + return true; + } + }; + + grid(i).events().localListen(pred, EVT_CACHE_QUERY_OBJECT_READ); + + objReadLsnrs[i] = pred; + + IgnitePredicate<Event> execPred = new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + assertTrue("Event: " + evt, evt instanceof CacheQueryExecutedEvent); + + CacheQueryExecutedEvent qe = (CacheQueryExecutedEvent)evt; + + assertEquals(SCAN.name(), qe.queryType()); + assertEquals(cacheName(), qe.cacheName()); + + assertNull(qe.className()); + assertNull(qe.clause()); + assertEquals(filterExp, qe.scanQueryFilter() != null); + assertNull(qe.continuousQueryFilter()); + assertNull(qe.arguments()); + + assertFalse("Too many events.", execEvtLatch.getCount() == 0); + + execEvtLatch.countDown(); + + return true; + } + }; + + grid(i).events().localListen(execPred, EVT_CACHE_QUERY_EXECUTED); + + qryExecLsnrs[i] = execPred; + } + } + + /** + * Stops listenening. + */ + private void stopListeners() { + for (int i = 0; i < gridCount(); i++) { + grid(i).events().stopLocalListen(objReadLsnrs[i]); + grid(i).events().stopLocalListen(qryExecLsnrs[i]); + } + } + + /** + * @throws InterruptedException If failed. + */ + private void checkEvents() throws InterruptedException { + assertTrue(execEvtLatch.await(1000, MILLISECONDS)); + assertTrue(readEvtLatch.await(1000, MILLISECONDS)); + + assertEquals(expMap.size(), evtMap.size()); + + for (Map.Entry<Object, Object> e : expMap.entrySet()) + assertEquals(e.getValue(), evtMap.get(e.getKey())); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0e8072f8/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheConfigVariationQueryTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheConfigVariationQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheConfigVariationQueryTestSuite.java new file mode 100644 index 0000000..83ae27f --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheConfigVariationQueryTestSuite.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.cache.IgniteCacheConfigVariationsQueryTest; +import org.apache.ignite.testframework.configvariations.ConfigVariationsTestSuiteBuilder; + +/** + * Test suite for cache queries. + */ +public class IgniteCacheConfigVariationQueryTestSuite extends TestSuite { + /** + * @return Test suite. + * @throws Exception If failed. + */ + public static TestSuite suite() throws Exception { + return new ConfigVariationsTestSuiteBuilder( + "Cache Config Variations Query Test Suite", + IgniteCacheConfigVariationsQueryTest.class) + .withBasicCacheParams() + .gridsCount(5).backups(1) + .testedNodesCount(3).withClients() + .build(); + } +}
