Repository: ignite
Updated Branches:
  refs/heads/ignite-5937 0014b2f6e -> b1e50296d


ignite-5937


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1e50296
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1e50296
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1e50296

Branch: refs/heads/ignite-5937
Commit: b1e50296d38b92a70cb058d7f2b471de4c3cadaf
Parents: 0014b2f
Author: sboikov <[email protected]>
Authored: Thu Oct 19 16:11:23 2017 +0300
Committer: sboikov <[email protected]>
Committed: Thu Oct 19 18:24:52 2017 +0300

----------------------------------------------------------------------
 .../cache/mvcc/CacheMvccAbstractTest.java       |  54 +-
 .../processors/query/h2/database/H2Tree.java    |  20 +-
 .../query/h2/database/H2TreeIndex.java          |  12 +
 .../cache/mvcc/CacheMvccSqlQueriesTest.java     | 540 ++++++++++++++++---
 4 files changed, 526 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b1e50296/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
index 3078655..ced6dfe 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.mvcc;
 
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -332,13 +333,15 @@ public abstract class CacheMvccAbstractTest extends 
GridCommonAbstractTest {
 
                     Map<Integer, Integer> lastUpdateCntrs = new HashMap<>();
 
+                    SqlFieldsQuery sumQry = new SqlFieldsQuery("select 
sum(val) from MvccTestAccount");
+
                     while (!stop.get()) {
                         while (keys.size() < ACCOUNTS)
                             keys.add(rnd.nextInt(ACCOUNTS));
 
                         TestCache<Integer, MvccTestAccount> cache = 
randomCache(caches, rnd);
 
-                        Map<Integer, MvccTestAccount> accounts;
+                        Map<Integer, MvccTestAccount> accounts = null;
 
                         try {
                             switch (readMode) {
@@ -389,6 +392,18 @@ public abstract class CacheMvccAbstractTest extends 
GridCommonAbstractTest {
                                     break;
                                 }
 
+                                case SQL_SUM: {
+                                    List<List<?>> res = 
cache.cache.query(sumQry).getAll();
+
+                                    assertEquals(1, res.size());
+
+                                    BigDecimal sum = 
(BigDecimal)res.get(0).get(0);
+
+                                    assertEquals(ACCOUNT_START_VAL * ACCOUNTS, 
sum.intValue());
+
+                                    break;
+                                }
+
                                 default: {
                                     fail();
 
@@ -400,29 +415,31 @@ public abstract class CacheMvccAbstractTest extends 
GridCommonAbstractTest {
                             cache.readUnlock();
                         }
 
-                        if (!withRmvs)
-                            assertEquals(ACCOUNTS, accounts.size());
+                        if (accounts != null) {
+                            if (!withRmvs)
+                                assertEquals(ACCOUNTS, accounts.size());
 
-                        int sum = 0;
+                            int sum = 0;
 
-                        for (int i = 0; i < ACCOUNTS; i++) {
-                            MvccTestAccount account = accounts.get(i);
+                            for (int i = 0; i < ACCOUNTS; i++) {
+                                MvccTestAccount account = accounts.get(i);
 
-                            if (account != null) {
-                                sum += account.val;
+                                if (account != null) {
+                                    sum += account.val;
 
-                                Integer cntr = lastUpdateCntrs.get(i);
+                                    Integer cntr = lastUpdateCntrs.get(i);
 
-                                if (cntr != null)
-                                    assertTrue(cntr <= account.updateCnt);
+                                    if (cntr != null)
+                                        assertTrue(cntr <= account.updateCnt);
 
-                                lastUpdateCntrs.put(i, cntr);
+                                    lastUpdateCntrs.put(i, cntr);
+                                }
+                                else
+                                    assertTrue(withRmvs);
                             }
-                            else
-                                assertTrue(withRmvs);
-                        }
 
-                        assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum);
+                            assertEquals(ACCOUNTS * ACCOUNT_START_VAL, sum);
+                        }
                     }
 
                     if (idx == 0) {
@@ -827,7 +844,10 @@ public abstract class CacheMvccAbstractTest extends 
GridCommonAbstractTest {
         SCAN,
 
         /** */
-        SQL_ALL
+        SQL_ALL,
+
+        /** */
+        SQL_SUM
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1e50296/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
index 03c5c68..9231775 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2Tree.java
@@ -267,9 +267,14 @@ public abstract class H2Tree extends 
BPlusTree<GridH2SearchRow, GridH2Row> {
         return mvccCompare(r1, r2);
     }
 
+    /**
+     * @param io IO.
+     * @param pageAddr Page address.
+     * @param idx Item index.
+     * @param r2 Search row.
+     * @return Comparison result.
+     */
     private int mvccCompare(H2RowLinkIO io, long pageAddr, int idx, 
GridH2SearchRow r2) {
-        int c = 0;
-
         if (mvccEnabled && !r2.indexSearchRow()) {
             long crdVer1 = io.getMvccCoordinatorVersion(pageAddr, idx);
             long crdVer2 = r2.mvccCoordinatorVersion();
@@ -277,7 +282,7 @@ public abstract class H2Tree extends 
BPlusTree<GridH2SearchRow, GridH2Row> {
             assert crdVer1 != 0;
             assert crdVer2 != 0 : r2;
 
-            c = Long.compare(unmaskCoordinatorVersion(crdVer1), 
unmaskCoordinatorVersion(crdVer2));
+            int c = Long.compare(unmaskCoordinatorVersion(crdVer1), 
unmaskCoordinatorVersion(crdVer2));
 
             if (c != 0)
                 return c;
@@ -287,10 +292,10 @@ public abstract class H2Tree extends 
BPlusTree<GridH2SearchRow, GridH2Row> {
             assert cntr != MVCC_COUNTER_NA;
             assert r2.mvccCounter() != MVCC_COUNTER_NA : r2;
 
-            c = Long.compare(cntr, r2.mvccCounter());
+            return Long.compare(cntr, r2.mvccCounter());
         }
 
-        return c;
+        return 0;
     }
 
     /**
@@ -314,10 +319,7 @@ public abstract class H2Tree extends 
BPlusTree<GridH2SearchRow, GridH2Row> {
             assert r1.mvccCounter() != MVCC_COUNTER_NA : r1;
             assert r2.mvccCounter() != MVCC_COUNTER_NA : r2;
 
-            c = Long.compare(r1.mvccCounter(), r2.mvccCounter());
-
-            if (c != 0)
-                return c;
+            return Long.compare(r1.mvccCounter(), r2.mvccCounter());
         }
 
         return 0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1e50296/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
index 72b6e2a..cdaa5b0 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java
@@ -321,6 +321,18 @@ public class H2TreeIndex extends GridH2IndexBase {
 
             H2Tree tree = treeForRead(seg);
 
+            if (cctx.mvccEnabled()) {
+                GridH2QueryContext qctx = GridH2QueryContext.get();
+
+                assert qctx != null;
+
+                H2TreeMvccFilterClosure mvccFilter = qctx.mvccFilter();
+
+                assert mvccFilter != null;
+
+                // TODO IGNITE-3478
+            }
+
             GridH2Row row = b ? tree.findFirst(): tree.findLast();
 
             return new SingleRowCursor(row);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1e50296/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
index 08b7552..5115eb1 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
@@ -18,11 +18,14 @@
 package org.apache.ignite.internal.processors.cache.mvcc;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.cache.processor.MutableEntry;
@@ -71,6 +74,13 @@ public class CacheMvccSqlQueriesTest extends 
CacheMvccAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testAccountsTxSumSql_SingleNode() throws Exception {
+        accountsTxReadAll(1, 0, 0, 64, new InitIndexing(Integer.class, 
MvccTestAccount.class), false, ReadMode.SQL_SUM);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testAccountsTxSql_WithRemoves_SingleNode() throws Exception {
         accountsTxReadAll(1, 0, 0, 64, new InitIndexing(Integer.class, 
MvccTestAccount.class), true, ReadMode.SQL_ALL);
     }
@@ -78,7 +88,29 @@ public class CacheMvccSqlQueriesTest extends 
CacheMvccAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testUpdateSingleValue() throws Exception {
+    public void testAccountsTxSql_ClientServer_Backups2() throws Exception {
+        accountsTxReadAll(4, 2, 2, 64, new InitIndexing(Integer.class, 
MvccTestAccount.class), false, ReadMode.SQL_ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdateSingleValue_SingleNode() throws Exception {
+        updateSingleValue(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdateSingleValue_ClientServer() throws Exception {
+        updateSingleValue(false);
+    }
+
+    /**
+     * @param singleNode {@code True} for test with single node.
+     * @throws Exception If failed.
+     */
+    private void updateSingleValue(boolean singleNode) throws Exception {
         final int VALS = 100;
 
         final int writers = 4;
@@ -99,139 +131,445 @@ public class CacheMvccSqlQueriesTest extends 
CacheMvccAbstractTest {
         };
 
         GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
-                new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
-                    @Override public void apply(Integer idx, List<TestCache> 
caches, AtomicBoolean stop) {
-                        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> 
caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                        int cnt = 0;
+                    int cnt = 0;
 
-                        while (!stop.get()) {
-                            TestCache<Integer, MvccTestSqlIndexValue> cache = 
randomCache(caches, rnd);
+                    while (!stop.get()) {
+                        TestCache<Integer, MvccTestSqlIndexValue> cache = 
randomCache(caches, rnd);
 
-                            try {
-                                Integer key = rnd.nextInt(VALS);
+                        try {
+                            Integer key = rnd.nextInt(VALS);
 
-                                cache.cache.invoke(key, new 
CacheEntryProcessor<Integer, MvccTestSqlIndexValue, Object>() {
-                                    @Override public Object 
process(MutableEntry<Integer, MvccTestSqlIndexValue> e, Object... args) {
-                                        Integer key = e.getKey();
+                            cache.cache.invoke(key, new 
CacheEntryProcessor<Integer, MvccTestSqlIndexValue, Object>() {
+                                @Override public Object 
process(MutableEntry<Integer, MvccTestSqlIndexValue> e, Object... args) {
+                                    Integer key = e.getKey();
 
-                                        MvccTestSqlIndexValue val = 
e.getValue();
+                                    MvccTestSqlIndexValue val = e.getValue();
 
-                                        int newIdxVal;
+                                    int newIdxVal;
 
-                                        if (val.idxVal1 < INC_BY) {
-                                            assertEquals(key.intValue(), 
val.idxVal1);
+                                    if (val.idxVal1 < INC_BY) {
+                                        assertEquals(key.intValue(), 
val.idxVal1);
 
-                                            newIdxVal = val.idxVal1 + INC_BY;
-                                        }
-                                        else {
-                                            assertEquals(INC_BY + key, 
val.idxVal1);
+                                        newIdxVal = val.idxVal1 + INC_BY;
+                                    }
+                                    else {
+                                        assertEquals(INC_BY + key, 
val.idxVal1);
 
-                                            newIdxVal = key;
-                                        }
+                                        newIdxVal = key;
+                                    }
 
-                                        e.setValue(new 
MvccTestSqlIndexValue(newIdxVal));
+                                    e.setValue(new 
MvccTestSqlIndexValue(newIdxVal));
 
-                                        return null;
-                                    }
-                                });
+                                    return null;
+                                }
+                            });
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+
+                    info("Writer finished, updates: " + cnt);
+                }
+            };
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> 
caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    SqlFieldsQuery[] qrys = new SqlFieldsQuery[3];
+
+                    qrys[0] = new SqlFieldsQuery(
+                            "select _key, idxVal1 from MvccTestSqlIndexValue 
where idxVal1=?");
+
+                    qrys[1] = new SqlFieldsQuery(
+                            "select _key, idxVal1 from MvccTestSqlIndexValue 
where idxVal1=? or idxVal1=?");
+
+                    qrys[2] = new SqlFieldsQuery(
+                            "select _key, idxVal1 from MvccTestSqlIndexValue 
where _key=?");
+
+                    while (!stop.get()) {
+                        Integer key = rnd.nextInt(VALS);
+
+                        int qryIdx = rnd.nextInt(3);
+
+                        TestCache<Integer, MvccTestSqlIndexValue> cache = 
randomCache(caches, rnd);
+
+                        List<List<?>> res;
+
+                        try {
+                            SqlFieldsQuery qry = qrys[qryIdx];
+
+                            if (qryIdx == 1)
+                                qry.setArgs(key, key + INC_BY);
+                            else
+                                qry.setArgs(key);
+
+                            res = cache.cache.query(qry).getAll();
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+
+                        assertTrue(qryIdx == 0 || !res.isEmpty());
+
+                        if (!res.isEmpty()) {
+                            assertEquals(1, res.size());
+
+                            List<?> resVals = res.get(0);
+
+                            Integer key0 = (Integer)resVals.get(0);
+                            Integer val0 = (Integer)resVals.get(1);
+
+                            assertEquals(key, key0);
+                            assertTrue(val0.equals(key) || val0.equals(key + 
INC_BY));
+                        }
+                    }
+
+                    if (idx == 0) {
+                        SqlFieldsQuery qry = new SqlFieldsQuery("select _key, 
idxVal1 from MvccTestSqlIndexValue");
+
+                        TestCache<Integer, MvccTestSqlIndexValue> cache = 
randomCache(caches, rnd);
+
+                        List<List<?>> res;
+
+                        try {
+                            res = cache.cache.query(qry).getAll();
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+
+                        assertEquals(VALS, res.size());
+
+                        for (List<?> vals : res)
+                            info("Value: " + vals);
+                    }
+                }
+            };
+
+        int srvs;
+        int clients;
+
+        if (singleNode) {
+            srvs = 1;
+            clients = 0;
+        }
+        else {
+            srvs = 4;
+            clients = 2;
+        }
+
+        readWriteTest(
+            null,
+            srvs,
+            clients,
+            0,
+            DFLT_PARTITION_COUNT,
+            writers,
+            readers,
+            DFLT_TEST_TIME,
+            new InitIndexing(Integer.class, MvccTestSqlIndexValue.class),
+            init,
+            writer,
+            reader);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCountTransactional_SingleNode() throws Exception {
+      countTransactional(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCountTransactional_ClientServer() throws Exception {
+        countTransactional(false);
+    }
+
+    /**
+     * @param singleNode {@code True} for test with single node.
+     * @throws Exception If failed.
+     */
+    private void countTransactional(boolean singleNode) throws Exception {
+        final int writers = 4;
+
+        final int readers = 4;
+
+        final int THREAD_KEY_RANGE = 100;
+
+        final int VAL_RANGE = 10;
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> 
caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    int min = idx * THREAD_KEY_RANGE;
+                    int max = min + THREAD_KEY_RANGE;
+
+                    info("Thread range [min=" + min + ", max=" + max + ']');
+
+                    int cnt = 0;
+
+                    Set<Integer> keys = new LinkedHashSet<>();
+
+                    while (!stop.get()) {
+                        TestCache<Integer, MvccTestSqlIndexValue> cache = 
randomCache(caches, rnd);
+
+                        try {
+                            // Add or remove 10 keys.
+                            if (!keys.isEmpty() && (keys.size() == 
THREAD_KEY_RANGE || rnd.nextInt(3) == 0 )) {
+                                Set<Integer> rmvKeys = new HashSet<>();
+
+                                for (Integer key : keys) {
+                                    rmvKeys.add(key);
+
+                                    if (rmvKeys.size() == 10)
+                                        break;
+                                }
+
+                                assertEquals(10, rmvKeys.size());
+
+                                cache.cache.removeAll(rmvKeys);
+
+                                keys.removeAll(rmvKeys);
                             }
-                            finally {
-                                cache.readUnlock();
+                            else {
+                                TreeMap<Integer, MvccTestSqlIndexValue> map = 
new TreeMap<>();
+
+                                while (map.size() != 10) {
+                                    Integer key = rnd.nextInt(min, max);
+
+                                    if (keys.add(key))
+                                        map.put(key, new 
MvccTestSqlIndexValue(rnd.nextInt(VAL_RANGE)));
+                                }
+
+                                assertEquals(10, map.size());
+
+                                cache.cache.putAll(map);
                             }
                         }
-
-                        info("Writer finished, updates: " + cnt);
+                        finally {
+                            cache.readUnlock();
+                        }
                     }
-                };
+
+                    info("Writer finished, updates: " + cnt);
+                }
+            };
 
         GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+            new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                @Override public void apply(Integer idx, List<TestCache> 
caches, AtomicBoolean stop) {
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    List<SqlFieldsQuery> qrys = new ArrayList<>();
+
+                    qrys.add(new SqlFieldsQuery("select count(*) from 
MvccTestSqlIndexValue"));
+
+                    qrys.add(new SqlFieldsQuery(
+                        "select count(*) from MvccTestSqlIndexValue where 
idxVal1 >= 0 and idxVal1 <= " + VAL_RANGE));
+
+                    while (!stop.get()) {
+                        TestCache<Integer, MvccTestSqlIndexValue> cache = 
randomCache(caches, rnd);
+
+                        try {
+                            for (SqlFieldsQuery qry : qrys) {
+                                List<List<?>> res = 
cache.cache.query(qry).getAll();
+
+                                assertEquals(1, res.size());
+
+                                Long cnt = (Long)res.get(0).get(0);
+
+                                assertTrue(cnt % 10 == 0);
+                            }
+                        }
+                        finally {
+                            cache.readUnlock();
+                        }
+                    }
+                }
+            };
+
+        int srvs;
+        int clients;
+
+        if (singleNode) {
+            srvs = 1;
+            clients = 0;
+        }
+        else {
+            srvs = 4;
+            clients = 2;
+        }
+
+        readWriteTest(
+            null,
+            srvs,
+            clients,
+            0,
+            DFLT_PARTITION_COUNT,
+            writers,
+            readers,
+            DFLT_TEST_TIME,
+            new InitIndexing(Integer.class, MvccTestSqlIndexValue.class),
+            null,
+            writer,
+            reader);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMaxTransactional_SingleNode() throws Exception {
+        maxMinTransactional(true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMaxTransactional_ClientServer() throws Exception {
+        maxMinTransactional(false);
+    }
+
+    /**
+     * @param singleNode {@code True} for test with single node.
+     * @throws Exception If failed.
+     */
+    private void maxMinTransactional(boolean singleNode) throws Exception {
+        final int writers = 1;
+
+        final int readers = 1;
+
+        final int THREAD_OPS = 10;
+
+        final int OP_RANGE = 10;
+
+        final int THREAD_KEY_RANGE = OP_RANGE * THREAD_OPS;
+
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer =
                 new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
                     @Override public void apply(Integer idx, List<TestCache> 
caches, AtomicBoolean stop) {
                         ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                        SqlFieldsQuery[] qrys = new SqlFieldsQuery[3];
+                        int min = idx * THREAD_KEY_RANGE;
 
-                        qrys[0] = new SqlFieldsQuery(
-                                "select _key, idxVal1 from 
MvccTestSqlIndexValue where idxVal1=?");
+                        info("Thread range [start=" + min + ']');
+
+                        int cnt = 0;
 
-                        qrys[1] = new SqlFieldsQuery(
-                                "select _key, idxVal1 from 
MvccTestSqlIndexValue where idxVal1=? or idxVal1=?");
+                        boolean add = true;
 
-                        qrys[2] = new SqlFieldsQuery(
-                                "select _key, idxVal1 from 
MvccTestSqlIndexValue where _key=?");
+                        int op = 0;
 
                         while (!stop.get()) {
-                            Integer key = rnd.nextInt(VALS);
+                            TestCache<Integer, MvccTestSqlIndexValue> cache = 
randomCache(caches, rnd);
 
-                            int qryIdx = rnd.nextInt(3);
+                            try {
+                                int startKey = min + op * OP_RANGE;
 
-                            TestCache<Integer, MvccTestSqlIndexValue> cache = 
randomCache(caches, rnd);
+                                if (add) {
+                                    Map<Integer, MvccTestSqlIndexValue> vals = 
new HashMap<>();
 
-                            List<List<?>> res;
+                                    for (int i = 0; i < 10; i++) {
+                                        Integer key = startKey + i + 1;
 
-                            try {
-                                SqlFieldsQuery qry = qrys[qryIdx];
+                                        vals.put(key, new 
MvccTestSqlIndexValue(key));
+                                    }
+
+                                    cache.cache.putAll(vals);
 
-                                if (qryIdx == 1)
-                                    qry.setArgs(key, key + INC_BY);
-                                else
-                                    qry.setArgs(key);
+                                    info("put " + vals.keySet());
+                                }
+                                else {
+                                    Set<Integer> rmvKeys = new HashSet<>();
 
-                                res = cache.cache.query(qry).getAll();
+                                    for (int i = 0; i < 10; i++)
+                                        rmvKeys.add(startKey + i + 1);
+
+                                    cache.cache.removeAll(rmvKeys);
+
+                                    info("remove " + rmvKeys);
+                                }
+
+                                if (++op == THREAD_OPS) {
+                                    add = !add;
+
+                                    op = 0;
+                                }
                             }
                             finally {
                                 cache.readUnlock();
                             }
+                        }
 
-                            assertTrue(qryIdx == 0 || !res.isEmpty());
-
-                            if (!res.isEmpty()) {
-                                assertEquals(1, res.size());
+                        info("Writer finished, updates: " + cnt);
+                    }
+                };
 
-                                List<?> resVals = res.get(0);
+        GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader =
+                new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() {
+                    @Override public void apply(Integer idx, List<TestCache> 
caches, AtomicBoolean stop) {
+                        ThreadLocalRandom rnd = ThreadLocalRandom.current();
 
-                                Integer key0 = (Integer)resVals.get(0);
-                                Integer val0 = (Integer)resVals.get(1);
+                        List<SqlFieldsQuery> qrys = new ArrayList<>();
 
-                                assertEquals(key, key0);
-                                assertTrue(val0.equals(key) || val0.equals(key 
+ INC_BY));
-                            }
-                        }
+                        qrys.add(new SqlFieldsQuery("select max(idxVal1) from 
MvccTestSqlIndexValue"));
 
-                        if (idx == 0) {
-                            SqlFieldsQuery qry = new SqlFieldsQuery("select 
_key, idxVal1 from MvccTestSqlIndexValue");
+                        qrys.add(new SqlFieldsQuery("select min(idxVal1) from 
MvccTestSqlIndexValue"));
 
+                        while (!stop.get()) {
                             TestCache<Integer, MvccTestSqlIndexValue> cache = 
randomCache(caches, rnd);
 
-                            List<List<?>> res;
-
                             try {
-                                res = cache.cache.query(qry).getAll();
+                                for (SqlFieldsQuery qry : qrys) {
+                                    List<List<?>> res = 
cache.cache.query(qry).getAll();
+
+                                    assertEquals(1, res.size());
+
+                                    Integer m = (Integer)res.get(0).get(0);
+
+                                    assertTrue(m == null || m % 10 == 0);
+                                }
                             }
                             finally {
                                 cache.readUnlock();
                             }
-
-                            assertEquals(VALS, res.size());
-
-                            for (List<?> vals : res)
-                                info("Value: " + vals);
                         }
                     }
                 };
 
+        int srvs;
+        int clients;
+
+        if (singleNode) {
+            srvs = 1;
+            clients = 0;
+        }
+        else {
+            srvs = 4;
+            clients = 2;
+        }
+
         readWriteTest(
             null,
-            1,
-            0,
+            srvs,
+            clients,
             0,
-            32,
+            DFLT_PARTITION_COUNT,
             writers,
             readers,
             DFLT_TEST_TIME,
             new InitIndexing(Integer.class, MvccTestSqlIndexValue.class),
-            init,
+            null,
             writer,
             reader);
     }
@@ -239,6 +577,60 @@ public class CacheMvccSqlQueriesTest extends 
CacheMvccAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testSqlQueriesWithMvcc() throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        IgniteCache<Integer, MvccTestSqlIndexValue> cache =  
(IgniteCache)srv0.createCache(
+            cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 
DFLT_PARTITION_COUNT).
+                setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class));
+
+        for (int i = 0; i < 10; i++)
+            cache.put(i, new MvccTestSqlIndexValue(i));
+
+        {
+            SqlFieldsQuery qry = new SqlFieldsQuery("select max(idxVal1) from 
MvccTestSqlIndexValue");
+
+            cache.query(qry).getAll();
+        }
+
+        {
+            SqlFieldsQuery qry = new SqlFieldsQuery("select min(idxVal1) from 
MvccTestSqlIndexValue");
+
+            cache.query(qry).getAll();
+        }
+
+        {
+
+            SqlFieldsQuery qry = new SqlFieldsQuery("select count(*) from 
MvccTestSqlIndexValue");
+
+            cache.query(qry).getAll();
+        }
+
+        {
+
+            SqlFieldsQuery qry = new SqlFieldsQuery("select count(*) from 
MvccTestSqlIndexValue where idxVal1=5");
+
+            cache.query(qry).getAll();
+        }
+
+        {
+
+            SqlFieldsQuery qry = new SqlFieldsQuery("select count(*) from 
MvccTestSqlIndexValue where idxVal1 >= 0 and idxVal1 < 5");
+
+            cache.query(qry).getAll();
+        }
+
+        {
+
+            SqlFieldsQuery qry = new SqlFieldsQuery("select sum(idxVal1) from 
MvccTestSqlIndexValue");
+
+            cache.query(qry).getAll();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testSqlSimple() throws Exception {
         startGrid(0);
 

Reply via email to