Repository: ignite Updated Branches: refs/heads/ignite-5937 0014b2f6e -> b1e50296d
ignite-5937 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1e50296 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1e50296 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1e50296 Branch: refs/heads/ignite-5937 Commit: b1e50296d38b92a70cb058d7f2b471de4c3cadaf Parents: 0014b2f Author: sboikov <[email protected]> Authored: Thu Oct 19 16:11:23 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Oct 19 18:24:52 2017 +0300 ---------------------------------------------------------------------- .../cache/mvcc/CacheMvccAbstractTest.java | 54 +- .../processors/query/h2/database/H2Tree.java | 20 +- .../query/h2/database/H2TreeIndex.java | 12 + .../cache/mvcc/CacheMvccSqlQueriesTest.java | 540 ++++++++++++++++--- 4 files changed, 526 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b1e50296/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java index 3078655..ced6dfe 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.mvcc; +import java.math.BigDecimal; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -332,13 +333,15 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { Map<Integer, Integer> lastUpdateCntrs = new HashMap<>(); + SqlFieldsQuery sumQry = new SqlFieldsQuery("select sum(val) from MvccTestAccount"); + while (!stop.get()) { while (keys.size() < ACCOUNTS) keys.add(rnd.nextInt(ACCOUNTS)); TestCache<Integer, MvccTestAccount> cache = randomCache(caches, rnd); - Map<Integer, MvccTestAccount> accounts; + Map<Integer, MvccTestAccount> accounts = null; try { switch (readMode) { @@ -389,6 +392,18 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { break; } + case SQL_SUM: { + List<List<?>> res = cache.cache.query(sumQry).getAll(); + + assertEquals(1, res.size()); + + BigDecimal sum = (BigDecimal)res.get(0).get(0); + + assertEquals(ACCOUNT_START_VAL * ACCOUNTS, sum.intValue()); + + break; + } + default: { fail(); @@ -400,29 +415,31 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { cache.readUnlock(); } - if (!withRmvs) - assertEquals(ACCOUNTS, accounts.size()); + if (accounts != null) { + if (!withRmvs) + assertEquals(ACCOUNTS, accounts.size()); - int sum = 0; + int sum = 0; - for (int i = 0; i < ACCOUNTS; i++) { - MvccTestAccount account = accounts.get(i); + for (int i = 0; i < ACCOUNTS; i++) { + MvccTestAccount account = accounts.get(i); - if (account != null) { - sum += account.val; + if (account != null) { + sum += account.val; - Integer cntr = lastUpdateCntrs.get(i); + Integer cntr = lastUpdateCntrs.get(i); - if (cntr != null) - assertTrue(cntr <= account.updateCnt); + if (cntr != null) + assertTrue(cntr <= account.updateCnt); - lastUpdateCntrs.put(i, cntr); + lastUpdateCntrs.put(i, cntr); + } + else + assertTrue(withRmvs); } - else - assertTrue(withRmvs); - } - assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum); + assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum); + } } if (idx == 0) { @@ -827,7 +844,10 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { SCAN, /** */ - SQL_ALL + SQL_ALL, + + /** */ + SQL_SUM } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/b1e50296/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java index 03c5c68..9231775 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java @@ -267,9 +267,14 @@ public abstract class H2Tree extends BPlusTree<GridH2SearchRow, GridH2Row> { return mvccCompare(r1, r2); } + /** + * @param io IO. + * @param pageAddr Page address. + * @param idx Item index. + * @param r2 Search row. + * @return Comparison result. + */ private int mvccCompare(H2RowLinkIO io, long pageAddr, int idx, GridH2SearchRow r2) { - int c = 0; - if (mvccEnabled && !r2.indexSearchRow()) { long crdVer1 = io.getMvccCoordinatorVersion(pageAddr, idx); long crdVer2 = r2.mvccCoordinatorVersion(); @@ -277,7 +282,7 @@ public abstract class H2Tree extends BPlusTree<GridH2SearchRow, GridH2Row> { assert crdVer1 != 0; assert crdVer2 != 0 : r2; - c = Long.compare(unmaskCoordinatorVersion(crdVer1), unmaskCoordinatorVersion(crdVer2)); + int c = Long.compare(unmaskCoordinatorVersion(crdVer1), unmaskCoordinatorVersion(crdVer2)); if (c != 0) return c; @@ -287,10 +292,10 @@ public abstract class H2Tree extends BPlusTree<GridH2SearchRow, GridH2Row> { assert cntr != MVCC_COUNTER_NA; assert r2.mvccCounter() != MVCC_COUNTER_NA : r2; - c = Long.compare(cntr, r2.mvccCounter()); + return Long.compare(cntr, r2.mvccCounter()); } - return c; + return 0; } /** @@ -314,10 +319,7 @@ public abstract class H2Tree extends BPlusTree<GridH2SearchRow, GridH2Row> { assert r1.mvccCounter() != MVCC_COUNTER_NA : r1; assert r2.mvccCounter() != MVCC_COUNTER_NA : r2; - c = Long.compare(r1.mvccCounter(), r2.mvccCounter()); - - if (c != 0) - return c; + return Long.compare(r1.mvccCounter(), r2.mvccCounter()); } return 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/b1e50296/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java index 72b6e2a..cdaa5b0 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java @@ -321,6 +321,18 @@ public class H2TreeIndex extends GridH2IndexBase { H2Tree tree = treeForRead(seg); + if (cctx.mvccEnabled()) { + GridH2QueryContext qctx = GridH2QueryContext.get(); + + assert qctx != null; + + H2TreeMvccFilterClosure mvccFilter = qctx.mvccFilter(); + + assert mvccFilter != null; + + // TODO IGNITE-3478 + } + GridH2Row row = b ? tree.findFirst(): tree.findLast(); return new SingleRowCursor(row); http://git-wip-us.apache.org/repos/asf/ignite/blob/b1e50296/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java index 08b7552..5115eb1 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java @@ -18,11 +18,14 @@ package org.apache.ignite.internal.processors.cache.mvcc; import java.io.Serializable; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import javax.cache.processor.MutableEntry; @@ -71,6 +74,13 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { /** * @throws Exception If failed. */ + public void testAccountsTxSumSql_SingleNode() throws Exception { + accountsTxReadAll(1, 0, 0, 64, new InitIndexing(Integer.class, MvccTestAccount.class), false, ReadMode.SQL_SUM); + } + + /** + * @throws Exception If failed. + */ public void testAccountsTxSql_WithRemoves_SingleNode() throws Exception { accountsTxReadAll(1, 0, 0, 64, new InitIndexing(Integer.class, MvccTestAccount.class), true, ReadMode.SQL_ALL); } @@ -78,7 +88,29 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { /** * @throws Exception If failed. */ - public void testUpdateSingleValue() throws Exception { + public void testAccountsTxSql_ClientServer_Backups2() throws Exception { + accountsTxReadAll(4, 2, 2, 64, new InitIndexing(Integer.class, MvccTestAccount.class), false, ReadMode.SQL_ALL); + } + + /** + * @throws Exception If failed. + */ + public void testUpdateSingleValue_SingleNode() throws Exception { + updateSingleValue(true); + } + + /** + * @throws Exception If failed. + */ + public void testUpdateSingleValue_ClientServer() throws Exception { + updateSingleValue(false); + } + + /** + * @param singleNode {@code True} for test with single node. + * @throws Exception If failed. + */ + private void updateSingleValue(boolean singleNode) throws Exception { final int VALS = 100; final int writers = 4; @@ -99,139 +131,445 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { }; GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer = - new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { - @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { - ThreadLocalRandom rnd = ThreadLocalRandom.current(); + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); - int cnt = 0; + int cnt = 0; - while (!stop.get()) { - TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd); + while (!stop.get()) { + TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd); - try { - Integer key = rnd.nextInt(VALS); + try { + Integer key = rnd.nextInt(VALS); - cache.cache.invoke(key, new CacheEntryProcessor<Integer, MvccTestSqlIndexValue, Object>() { - @Override public Object process(MutableEntry<Integer, MvccTestSqlIndexValue> e, Object... args) { - Integer key = e.getKey(); + cache.cache.invoke(key, new CacheEntryProcessor<Integer, MvccTestSqlIndexValue, Object>() { + @Override public Object process(MutableEntry<Integer, MvccTestSqlIndexValue> e, Object... args) { + Integer key = e.getKey(); - MvccTestSqlIndexValue val = e.getValue(); + MvccTestSqlIndexValue val = e.getValue(); - int newIdxVal; + int newIdxVal; - if (val.idxVal1 < INC_BY) { - assertEquals(key.intValue(), val.idxVal1); + if (val.idxVal1 < INC_BY) { + assertEquals(key.intValue(), val.idxVal1); - newIdxVal = val.idxVal1 + INC_BY; - } - else { - assertEquals(INC_BY + key, val.idxVal1); + newIdxVal = val.idxVal1 + INC_BY; + } + else { + assertEquals(INC_BY + key, val.idxVal1); - newIdxVal = key; - } + newIdxVal = key; + } - e.setValue(new MvccTestSqlIndexValue(newIdxVal)); + e.setValue(new MvccTestSqlIndexValue(newIdxVal)); - return null; - } - }); + return null; + } + }); + } + finally { + cache.readUnlock(); + } + } + + info("Writer finished, updates: " + cnt); + } + }; + + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + SqlFieldsQuery[] qrys = new SqlFieldsQuery[3]; + + qrys[0] = new SqlFieldsQuery( + "select _key, idxVal1 from MvccTestSqlIndexValue where idxVal1=?"); + + qrys[1] = new SqlFieldsQuery( + "select _key, idxVal1 from MvccTestSqlIndexValue where idxVal1=? or idxVal1=?"); + + qrys[2] = new SqlFieldsQuery( + "select _key, idxVal1 from MvccTestSqlIndexValue where _key=?"); + + while (!stop.get()) { + Integer key = rnd.nextInt(VALS); + + int qryIdx = rnd.nextInt(3); + + TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd); + + List<List<?>> res; + + try { + SqlFieldsQuery qry = qrys[qryIdx]; + + if (qryIdx == 1) + qry.setArgs(key, key + INC_BY); + else + qry.setArgs(key); + + res = cache.cache.query(qry).getAll(); + } + finally { + cache.readUnlock(); + } + + assertTrue(qryIdx == 0 || !res.isEmpty()); + + if (!res.isEmpty()) { + assertEquals(1, res.size()); + + List<?> resVals = res.get(0); + + Integer key0 = (Integer)resVals.get(0); + Integer val0 = (Integer)resVals.get(1); + + assertEquals(key, key0); + assertTrue(val0.equals(key) || val0.equals(key + INC_BY)); + } + } + + if (idx == 0) { + SqlFieldsQuery qry = new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue"); + + TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd); + + List<List<?>> res; + + try { + res = cache.cache.query(qry).getAll(); + } + finally { + cache.readUnlock(); + } + + assertEquals(VALS, res.size()); + + for (List<?> vals : res) + info("Value: " + vals); + } + } + }; + + int srvs; + int clients; + + if (singleNode) { + srvs = 1; + clients = 0; + } + else { + srvs = 4; + clients = 2; + } + + readWriteTest( + null, + srvs, + clients, + 0, + DFLT_PARTITION_COUNT, + writers, + readers, + DFLT_TEST_TIME, + new InitIndexing(Integer.class, MvccTestSqlIndexValue.class), + init, + writer, + reader); + } + + /** + * @throws Exception If failed. + */ + public void testCountTransactional_SingleNode() throws Exception { + countTransactional(true); + } + + /** + * @throws Exception If failed. + */ + public void testCountTransactional_ClientServer() throws Exception { + countTransactional(false); + } + + /** + * @param singleNode {@code True} for test with single node. + * @throws Exception If failed. + */ + private void countTransactional(boolean singleNode) throws Exception { + final int writers = 4; + + final int readers = 4; + + final int THREAD_KEY_RANGE = 100; + + final int VAL_RANGE = 10; + + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int min = idx * THREAD_KEY_RANGE; + int max = min + THREAD_KEY_RANGE; + + info("Thread range [min=" + min + ", max=" + max + ']'); + + int cnt = 0; + + Set<Integer> keys = new LinkedHashSet<>(); + + while (!stop.get()) { + TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd); + + try { + // Add or remove 10 keys. + if (!keys.isEmpty() && (keys.size() == THREAD_KEY_RANGE || rnd.nextInt(3) == 0 )) { + Set<Integer> rmvKeys = new HashSet<>(); + + for (Integer key : keys) { + rmvKeys.add(key); + + if (rmvKeys.size() == 10) + break; + } + + assertEquals(10, rmvKeys.size()); + + cache.cache.removeAll(rmvKeys); + + keys.removeAll(rmvKeys); } - finally { - cache.readUnlock(); + else { + TreeMap<Integer, MvccTestSqlIndexValue> map = new TreeMap<>(); + + while (map.size() != 10) { + Integer key = rnd.nextInt(min, max); + + if (keys.add(key)) + map.put(key, new MvccTestSqlIndexValue(rnd.nextInt(VAL_RANGE))); + } + + assertEquals(10, map.size()); + + cache.cache.putAll(map); } } - - info("Writer finished, updates: " + cnt); + finally { + cache.readUnlock(); + } } - }; + + info("Writer finished, updates: " + cnt); + } + }; GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + List<SqlFieldsQuery> qrys = new ArrayList<>(); + + qrys.add(new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue")); + + qrys.add(new SqlFieldsQuery( + "select count(*) from MvccTestSqlIndexValue where idxVal1 >= 0 and idxVal1 <= " + VAL_RANGE)); + + while (!stop.get()) { + TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd); + + try { + for (SqlFieldsQuery qry : qrys) { + List<List<?>> res = cache.cache.query(qry).getAll(); + + assertEquals(1, res.size()); + + Long cnt = (Long)res.get(0).get(0); + + assertTrue(cnt % 10 == 0); + } + } + finally { + cache.readUnlock(); + } + } + } + }; + + int srvs; + int clients; + + if (singleNode) { + srvs = 1; + clients = 0; + } + else { + srvs = 4; + clients = 2; + } + + readWriteTest( + null, + srvs, + clients, + 0, + DFLT_PARTITION_COUNT, + writers, + readers, + DFLT_TEST_TIME, + new InitIndexing(Integer.class, MvccTestSqlIndexValue.class), + null, + writer, + reader); + } + + /** + * @throws Exception If failed. + */ + public void testMaxTransactional_SingleNode() throws Exception { + maxMinTransactional(true); + } + + /** + * @throws Exception If failed. + */ + public void testMaxTransactional_ClientServer() throws Exception { + maxMinTransactional(false); + } + + /** + * @param singleNode {@code True} for test with single node. + * @throws Exception If failed. + */ + private void maxMinTransactional(boolean singleNode) throws Exception { + final int writers = 1; + + final int readers = 1; + + final int THREAD_OPS = 10; + + final int OP_RANGE = 10; + + final int THREAD_KEY_RANGE = OP_RANGE * THREAD_OPS; + + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer = new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); - SqlFieldsQuery[] qrys = new SqlFieldsQuery[3]; + int min = idx * THREAD_KEY_RANGE; - qrys[0] = new SqlFieldsQuery( - "select _key, idxVal1 from MvccTestSqlIndexValue where idxVal1=?"); + info("Thread range [start=" + min + ']'); + + int cnt = 0; - qrys[1] = new SqlFieldsQuery( - "select _key, idxVal1 from MvccTestSqlIndexValue where idxVal1=? or idxVal1=?"); + boolean add = true; - qrys[2] = new SqlFieldsQuery( - "select _key, idxVal1 from MvccTestSqlIndexValue where _key=?"); + int op = 0; while (!stop.get()) { - Integer key = rnd.nextInt(VALS); + TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd); - int qryIdx = rnd.nextInt(3); + try { + int startKey = min + op * OP_RANGE; - TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd); + if (add) { + Map<Integer, MvccTestSqlIndexValue> vals = new HashMap<>(); - List<List<?>> res; + for (int i = 0; i < 10; i++) { + Integer key = startKey + i + 1; - try { - SqlFieldsQuery qry = qrys[qryIdx]; + vals.put(key, new MvccTestSqlIndexValue(key)); + } + + cache.cache.putAll(vals); - if (qryIdx == 1) - qry.setArgs(key, key + INC_BY); - else - qry.setArgs(key); + info("put " + vals.keySet()); + } + else { + Set<Integer> rmvKeys = new HashSet<>(); - res = cache.cache.query(qry).getAll(); + for (int i = 0; i < 10; i++) + rmvKeys.add(startKey + i + 1); + + cache.cache.removeAll(rmvKeys); + + info("remove " + rmvKeys); + } + + if (++op == THREAD_OPS) { + add = !add; + + op = 0; + } } finally { cache.readUnlock(); } + } - assertTrue(qryIdx == 0 || !res.isEmpty()); - - if (!res.isEmpty()) { - assertEquals(1, res.size()); + info("Writer finished, updates: " + cnt); + } + }; - List<?> resVals = res.get(0); + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); - Integer key0 = (Integer)resVals.get(0); - Integer val0 = (Integer)resVals.get(1); + List<SqlFieldsQuery> qrys = new ArrayList<>(); - assertEquals(key, key0); - assertTrue(val0.equals(key) || val0.equals(key + INC_BY)); - } - } + qrys.add(new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue")); - if (idx == 0) { - SqlFieldsQuery qry = new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue"); + qrys.add(new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue")); + while (!stop.get()) { TestCache<Integer, MvccTestSqlIndexValue> cache = randomCache(caches, rnd); - List<List<?>> res; - try { - res = cache.cache.query(qry).getAll(); + for (SqlFieldsQuery qry : qrys) { + List<List<?>> res = cache.cache.query(qry).getAll(); + + assertEquals(1, res.size()); + + Integer m = (Integer)res.get(0).get(0); + + assertTrue(m == null || m % 10 == 0); + } } finally { cache.readUnlock(); } - - assertEquals(VALS, res.size()); - - for (List<?> vals : res) - info("Value: " + vals); } } }; + int srvs; + int clients; + + if (singleNode) { + srvs = 1; + clients = 0; + } + else { + srvs = 4; + clients = 2; + } + readWriteTest( null, - 1, - 0, + srvs, + clients, 0, - 32, + DFLT_PARTITION_COUNT, writers, readers, DFLT_TEST_TIME, new InitIndexing(Integer.class, MvccTestSqlIndexValue.class), - init, + null, writer, reader); } @@ -239,6 +577,60 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { /** * @throws Exception If failed. */ + public void testSqlQueriesWithMvcc() throws Exception { + Ignite srv0 = startGrid(0); + + IgniteCache<Integer, MvccTestSqlIndexValue> cache = (IgniteCache)srv0.createCache( + cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT). + setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class)); + + for (int i = 0; i < 10; i++) + cache.put(i, new MvccTestSqlIndexValue(i)); + + { + SqlFieldsQuery qry = new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue"); + + cache.query(qry).getAll(); + } + + { + SqlFieldsQuery qry = new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue"); + + cache.query(qry).getAll(); + } + + { + + SqlFieldsQuery qry = new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue"); + + cache.query(qry).getAll(); + } + + { + + SqlFieldsQuery qry = new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1=5"); + + cache.query(qry).getAll(); + } + + { + + SqlFieldsQuery qry = new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >= 0 and idxVal1 < 5"); + + cache.query(qry).getAll(); + } + + { + + SqlFieldsQuery qry = new SqlFieldsQuery("select sum(idxVal1) from MvccTestSqlIndexValue"); + + cache.query(qry).getAll(); + } + } + + /** + * @throws Exception If failed. + */ public void testSqlSimple() throws Exception { startGrid(0);
