Repository: ignite
Updated Branches:
  refs/heads/ignite-5937 c28e02d02 -> 93bb0bc7a


ignite-5937


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93bb0bc7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93bb0bc7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93bb0bc7

Branch: refs/heads/ignite-5937
Commit: 93bb0bc7adf0cae8dd9dc5168bd64236950d954d
Parents: c28e02d
Author: sboikov <[email protected]>
Authored: Wed Oct 18 15:05:42 2017 +0300
Committer: sboikov <[email protected]>
Committed: Wed Oct 18 15:59:16 2017 +0300

----------------------------------------------------------------------
 .../cache/IgniteCacheOffheapManagerImpl.java    |  15 +-
 .../cache/mvcc/CacheCoordinatorsProcessor.java  |   4 +-
 .../cache/persistence/tree/BPlusTree.java       |   8 +-
 .../datastreamer/DataStreamerImpl.java          |   2 +-
 .../processors/query/GridQueryProcessor.java    |   2 +-
 .../cache/mvcc/CacheMvccAbstractTest.java       |   4 +-
 .../h2/twostep/GridMergeIndexIterator.java      |  16 +-
 .../h2/twostep/GridReduceQueryExecutor.java     |  26 ++-
 .../cache/mvcc/CacheMvccSqlQueriesTest.java     | 230 ++++++++++++++++++-
 9 files changed, 269 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/93bb0bc7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index bea3ed7..5d0d51d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -39,9 +39,7 @@ import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
-import 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
-import 
org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse;
 import 
org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionWithoutTxs;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList;
@@ -89,6 +87,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
 import static 
org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
+import static 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_START_CNTR;
 import static 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion;
 import static 
org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue;
 
@@ -1420,12 +1419,12 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                 // TODO IGNITE-3478: null is passed for loaded from store, 
need handle better.
                 if (mvccVer == null) {
-                    mvccVer = new MvccCoordinatorVersionWithoutTxs(1L, 
CacheCoordinatorsProcessor.START_VER, 0L);
+                    mvccVer = new MvccCoordinatorVersionWithoutTxs(1L, 
MVCC_START_CNTR, 0L);
 
                     newVal = true;
                 }
                 else
-                    assert val != null || 
CacheCoordinatorsProcessor.versionForRemovedValue(mvccVer.coordinatorVersion());
+                    assert val != null || 
versionForRemovedValue(mvccVer.coordinatorVersion());
 
                 if (val != null) {
                     val.valueBytes(coCtx);
@@ -1477,8 +1476,12 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
                 assert !old;
 
-                if (val != null)
+                if (val != null) {
                     incrementSize(cctx.cacheId());
+
+                    if (cctx.queries().enabled())
+                        cctx.queries().store(updateRow, mvccVer, null);
+                }
             }
             finally {
                 busyLock.leaveBusy();
@@ -2026,7 +2029,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                         if (curKey != null && row.key().equals(curKey))
                             continue;
 
-                        if 
(CacheCoordinatorsProcessor.versionForRemovedValue(rowCrdVerMasked)) {
+                        if (versionForRemovedValue(rowCrdVerMasked)) {
                             curKey = row.key();
 
                             continue;

http://git-wip-us.apache.org/repos/asf/ignite/blob/93bb0bc7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
index 5e76fe1..06d617c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java
@@ -78,7 +78,7 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
     public static final long COUNTER_NA = 0L;
 
     /** */
-    public static final long START_VER = 1L;
+    public static final long MVCC_START_CNTR = 1L;
 
     /** */
     private static final boolean STAT_CNTRS = false;
@@ -99,7 +99,7 @@ public class CacheCoordinatorsProcessor extends 
GridProcessorAdapter {
     private volatile MvccCoordinator curCrd;
 
     /** */
-    private final AtomicLong mvccCntr = new AtomicLong(START_VER);
+    private final AtomicLong mvccCntr = new AtomicLong(MVCC_START_CNTR);
 
     /** */
     private final GridAtomicLong committedCntr = new GridAtomicLong(1L);

http://git-wip-us.apache.org/repos/asf/ignite/blob/93bb0bc7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index 9951a76..e69284d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -4800,8 +4800,10 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             int resCnt = 0;
 
             for (int i = 0; i < cnt; i++) {
-                if (c == null || c.apply(BPlusTree.this, io, pageAddr, 
startIdx + i)) {
-                    T r = getRow(io, pageAddr, startIdx + i, x);
+                int itemIdx = startIdx + i;
+
+                if (c == null || c.apply(BPlusTree.this, io, pageAddr, 
itemIdx)) {
+                    T r = getRow(io, pageAddr, itemIdx, x);
 
                     rows = GridArrays.set(rows, resCnt++, r);
                 }
@@ -4809,7 +4811,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
             GridArrays.clearTail(rows, resCnt);
 
-            return true;
+            return resCnt > 0;
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/93bb0bc7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index e6300a9..dab2ec0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -134,7 +134,7 @@ public class DataStreamerImpl<K, V> implements 
IgniteDataStreamer<K, V>, Delayed
 
     /** Version which is less then any version generated on coordinator. */
     private static final MvccCoordinatorVersion ISOLATED_STREAMER_MVCC_VER =
-        new MvccCoordinatorVersionWithoutTxs(1L, 
CacheCoordinatorsProcessor.START_VER, 0L);
+        new MvccCoordinatorVersionWithoutTxs(1L, 
CacheCoordinatorsProcessor.MVCC_START_CNTR, 0L);
 
     /** Cache receiver. */
     private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER;

http://git-wip-us.apache.org/repos/asf/ignite/blob/93bb0bc7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 74157d8..c70b73d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -2324,7 +2324,7 @@ public class GridQueryProcessor extends 
GridProcessorAdapter {
     public void remove(GridCacheContext cctx, CacheDataRow val, @Nullable 
MvccCoordinatorVersion newVer)
         throws IgniteCheckedException {
         assert val != null;
-        assert !cctx.mvccEnabled() || newVer == null;
+        assert cctx.mvccEnabled() || newVer == null;
 
         if (log.isDebugEnabled())
             log.debug("Remove [cacheName=" + cctx.name() + ", key=" + 
val.key()+ ", val=" + val.value() + "]");

http://git-wip-us.apache.org/repos/asf/ignite/blob/93bb0bc7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
index 999144f..3078655 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java
@@ -378,7 +378,7 @@ public abstract class CacheMvccAbstractTest extends 
GridCommonAbstractTest {
 
                                         for (List<?> row : 
cache.cache.query(qry)) {
                                             Integer id = (Integer)row.get(0);
-                                            Integer val = (Integer)row.get(0);
+                                            Integer val = (Integer)row.get(1);
 
                                             MvccTestAccount old = 
accounts.put(id, new MvccTestAccount(val, 1));
 
@@ -713,7 +713,7 @@ public abstract class CacheMvccAbstractTest extends 
GridCommonAbstractTest {
      * @param node Node.
      * @throws Exception If failed.
      */
-    final void checkActiveQueriesCleanup(Ignite node) throws Exception {
+    protected final void checkActiveQueriesCleanup(Ignite node) throws 
Exception {
         final CacheCoordinatorsProcessor crd = 
((IgniteKernal)node).context().cache().context().coordinators();
 
         assertTrue("Active queries not cleared: " + node.name(), 
GridTestUtils.waitForCondition(

http://git-wip-us.apache.org/repos/asf/ignite/blob/93bb0bc7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
index 1c0efb3..4518d14 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java
@@ -25,8 +25,10 @@ import java.util.NoSuchElementException;
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
 import org.h2.index.Cursor;
 import org.h2.result.Row;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Iterator that transparently and sequentially traverses a bunch of {@link 
GridMergeIndex} objects.
@@ -59,6 +61,9 @@ class GridMergeIndexIterator implements Iterator<List<?>>, 
AutoCloseable {
     /** Whether remote resources were released. */
     private boolean released;
 
+    /** */
+    private MvccQueryTracker mvccTracker;
+
     /**
      * Constructor.
      *
@@ -69,14 +74,19 @@ class GridMergeIndexIterator implements Iterator<List<?>>, 
AutoCloseable {
      * @param distributedJoins Distributed joins.
      * @throws IgniteCheckedException if failed.
      */
-    GridMergeIndexIterator(GridReduceQueryExecutor rdcExec, 
Collection<ClusterNode> nodes, ReduceQueryRun run,
-        long qryReqId, boolean distributedJoins)
+    GridMergeIndexIterator(GridReduceQueryExecutor rdcExec,
+        Collection<ClusterNode> nodes,
+        ReduceQueryRun run,
+        long qryReqId,
+        boolean distributedJoins,
+        @Nullable MvccQueryTracker mvccTracker)
         throws IgniteCheckedException {
         this.rdcExec = rdcExec;
         this.nodes = nodes;
         this.run = run;
         this.qryReqId = qryReqId;
         this.distributedJoins = distributedJoins;
+        this.mvccTracker = mvccTracker;
 
         this.idxIter = run.indexes().iterator();
 
@@ -155,7 +165,7 @@ class GridMergeIndexIterator implements Iterator<List<?>>, 
AutoCloseable {
     private void releaseIfNeeded() {
         if (!released) {
             try {
-                rdcExec.releaseRemoteResources(nodes, run, qryReqId, 
distributedJoins);
+                rdcExec.releaseRemoteResources(nodes, run, qryReqId, 
distributedJoins, mvccTracker);
             }
             finally {
                 released = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/93bb0bc7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index debba5e..80b1970 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -566,7 +566,7 @@ public class GridReduceQueryExecutor {
 
             List<Integer> cacheIds = qry.cacheIds();
 
-            MvccCoordinatorVersion mvccVer = null;
+            MvccQueryTracker mvccTracker = null;
 
             // TODO IGNITE-3478.
             if (qry.mvccEnabled()) {
@@ -574,7 +574,7 @@ public class GridReduceQueryExecutor {
 
                 final GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
 
-                MvccQueryTracker mvccTracker = new 
MvccQueryTracker(cacheContext(cacheIds.get(0)), true,
+                mvccTracker = new 
MvccQueryTracker(cacheContext(cacheIds.get(0)), true,
                     new IgniteBiInClosure<AffinityTopologyVersion, 
IgniteCheckedException>() {
                     @Override public void apply(AffinityTopologyVersion 
topVer, IgniteCheckedException e) {
                         fut.onDone(null, e);
@@ -585,8 +585,6 @@ public class GridReduceQueryExecutor {
 
                 try {
                     fut.get();
-
-                    mvccVer = mvccTracker.mvccVersion();
                 }
                 catch (IgniteCheckedException e) {
                     throw new CacheException(e);
@@ -759,8 +757,10 @@ public class GridReduceQueryExecutor {
                     .parameters(params)
                     .flags(flags)
                     .timeout(timeoutMillis)
-                    .schemaName(schemaName)
-                    .mvccVersion(mvccVer);
+                    .schemaName(schemaName);
+
+                if (mvccTracker != null)
+                    req.mvccVersion(mvccTracker.mvccVersion());
 
                 if (send(nodes, req, parts == null ? null : new 
ExplicitPartitionsSpecializer(qryMap), false)) {
                     awaitAllReplies(r, nodes, cancel);
@@ -795,7 +795,12 @@ public class GridReduceQueryExecutor {
 
                 if (!retry) {
                     if (skipMergeTbl) {
-                        resIter = new GridMergeIndexIterator(this, finalNodes, 
r, qryReqId, qry.distributedJoins());
+                        resIter = new GridMergeIndexIterator(this,
+                            finalNodes,
+                            r,
+                            qryReqId,
+                            qry.distributedJoins(),
+                            mvccTracker);
 
                         release = false;
                     }
@@ -865,7 +870,7 @@ public class GridReduceQueryExecutor {
             }
             finally {
                 if (release) {
-                    releaseRemoteResources(finalNodes, r, qryReqId, 
qry.distributedJoins());
+                    releaseRemoteResources(finalNodes, r, qryReqId, 
qry.distributedJoins(), mvccTracker);
 
                     if (!skipMergeTbl) {
                         for (int i = 0, mapQrys = qry.mapQueries().size(); i < 
mapQrys; i++)
@@ -1060,7 +1065,10 @@ public class GridReduceQueryExecutor {
      * @param distributedJoins Distributed join flag.
      */
     public void releaseRemoteResources(Collection<ClusterNode> nodes, 
ReduceQueryRun r, long qryReqId,
-        boolean distributedJoins) {
+        boolean distributedJoins, MvccQueryTracker mvccTracker) {
+        if (mvccTracker != null)
+            mvccTracker.onQueryDone();
+
         // For distributedJoins need always send cancel request to cleanup 
resources.
         if (distributedJoins)
             send(nodes, new GridQueryCancelRequest(qryReqId), null, false);

http://git-wip-us.apache.org/repos/asf/ignite/blob/93bb0bc7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
----------------------------------------------------------------------
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
index 66ae606..c14fe65 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java
@@ -18,9 +18,15 @@
 package org.apache.ignite.internal.processors.cache.mvcc;
 
 import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.SqlQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -33,6 +39,7 @@ import static 
org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 /**
  * TODO IGNITE-3478: text/spatial indexes with mvcc.
  * TODO IGNITE-3478: dynamic index create.
+ * TODO IGNITE-3478: tests with/without inline.
  */
 @SuppressWarnings("unchecked")
 public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest {
@@ -42,7 +49,7 @@ public class CacheMvccSqlQueriesTest extends 
CacheMvccAbstractTest {
     public void testAccountsTxSql_SingleNode_SinglePartition() throws 
Exception {
         accountsTxReadAll(1, 0, 0, 1, new 
IgniteInClosure<CacheConfiguration>() {
             @Override public void apply(CacheConfiguration ccfg) {
-                ccfg.setIndexedTypes(Integer.class, MvccTestAccount.class);
+                ccfg.setIndexedTypes(Integer.class, 
MvccTestAccount.class).setSqlIndexMaxInlineSize(0);
             }
         }, false, ReadMode.SQL_ALL);
     }
@@ -50,35 +57,236 @@ public class CacheMvccSqlQueriesTest extends 
CacheMvccAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testAccountsTxSql_WithRemoves_SingleNode_SinglePartition() 
throws Exception {
+        accountsTxReadAll(1, 0, 0, 1, new 
IgniteInClosure<CacheConfiguration>() {
+            @Override public void apply(CacheConfiguration ccfg) {
+                ccfg.setIndexedTypes(Integer.class, 
MvccTestAccount.class).setSqlIndexMaxInlineSize(0);
+            }
+        }, true, ReadMode.SQL_ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsTxSql_SingleNode() throws Exception {
+        accountsTxReadAll(1, 0, 0, 64, new 
IgniteInClosure<CacheConfiguration>() {
+            @Override public void apply(CacheConfiguration ccfg) {
+                ccfg.setIndexedTypes(Integer.class, 
MvccTestAccount.class).setSqlIndexMaxInlineSize(0);
+            }
+        }, false, ReadMode.SQL_ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAccountsTxSql_WithRemoves_SingleNode() throws Exception {
+        accountsTxReadAll(1, 0, 0, 64, new 
IgniteInClosure<CacheConfiguration>() {
+            @Override public void apply(CacheConfiguration ccfg) {
+                ccfg.setIndexedTypes(Integer.class, 
MvccTestAccount.class).setSqlIndexMaxInlineSize(0);
+            }
+        }, true, ReadMode.SQL_ALL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testSqlSimple() throws Exception {
         Ignite srv0 = startGrid(0);
 
         IgniteCache<Integer, MvccTestSqlIndexValue> cache =  
(IgniteCache)srv0.createCache(
             cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 
DFLT_PARTITION_COUNT).
-            setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class).
-            setSqlIndexMaxInlineSize(0));
+                setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class).
+                setSqlIndexMaxInlineSize(0));
+
+        Map<Integer, Integer> expVals = new HashMap<>();
+
+        checkValues(expVals, cache);
 
         cache.put(1, new MvccTestSqlIndexValue(1));
+        expVals.put(1, 1);
+
+        checkValues(expVals, cache);
+
         cache.put(1, new MvccTestSqlIndexValue(2));
+        expVals.put(1, 2);
+
+        checkValues(expVals, cache);
 
         cache.put(2, new MvccTestSqlIndexValue(1));
+        expVals.put(2, 1);
         cache.put(3, new MvccTestSqlIndexValue(1));
+        expVals.put(3, 1);
         cache.put(4, new MvccTestSqlIndexValue(1));
+        expVals.put(4, 1);
 
-        SqlQuery<Integer, MvccTestSqlIndexValue> qry =
-            new SqlQuery<>(MvccTestSqlIndexValue.class, "_key = 1");
+        checkValues(expVals, cache);
 
-        List<IgniteCache.Entry<Integer, MvccTestSqlIndexValue>> res;
+        cache.remove(1);
+        expVals.remove(1);
 
-         res = cache.query(qry).getAll();
+        checkValues(expVals, cache);
 
-        assertEquals(1, res.size());
+        checkNoValue(1, cache);
 
-        cache.remove(1);
+        cache.put(1, new MvccTestSqlIndexValue(10));
+        expVals.put(1, 10);
+
+        checkValues(expVals, cache);
+
+        checkActiveQueriesCleanup(srv0);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSqlSimplePutRemoveRandom() throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        IgniteCache<Integer, MvccTestSqlIndexValue> cache = (IgniteCache) 
srv0.createCache(
+            cacheConfiguration(PARTITIONED, FULL_SYNC, 0, 
DFLT_PARTITION_COUNT).
+                setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class).
+                setSqlIndexMaxInlineSize(0));
+
+        Map<Integer, Integer> expVals = new HashMap<>();
+
+        final int KEYS = 100;
+        final int VALS = 10;
+
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+        long stopTime = System.currentTimeMillis() + 10_000;
+
+        for (int i = 0; i < 100_000; i++) {
+            Integer key = rnd.nextInt(KEYS);
+
+            if (rnd.nextInt(5) == 0) {
+                cache.remove(key);
+
+                expVals.remove(key);
+            }
+            else {
+                Integer val = rnd.nextInt(VALS);
+
+                cache.put(key, new MvccTestSqlIndexValue(val));
+
+                expVals.put(key, val);
+            }
 
-        res = cache.query(qry).getAll();
+            checkValues(expVals, cache);
 
-        assertEquals(0, res.size());
+            if (System.currentTimeMillis() > stopTime) {
+                info("Stop test, iteration: " + i);
+
+                break;
+            }
+        }
+
+        for (int i = 0; i < KEYS; i++) {
+            if (!expVals.containsKey(i))
+                checkNoValue(i, cache);
+        }
+
+        checkActiveQueriesCleanup(srv0);
+    }
+
+    /**
+     * @param key Key.
+     * @param cache Cache.
+     */
+    private void checkNoValue(Object key, IgniteCache cache) {
+        SqlQuery<Integer, MvccTestSqlIndexValue> qry;
+
+        qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "_key = ?");
+
+        qry.setArgs(key);
+
+        List<IgniteCache.Entry<Integer, MvccTestSqlIndexValue>> res = 
cache.query(qry).getAll();
+
+        assertTrue(res.isEmpty());
+    }
+
+    /**
+     * @param expVals Expected values.
+     * @param cache Cache.
+     */
+    private void checkValues(Map<Integer, Integer> expVals, 
IgniteCache<Integer, MvccTestSqlIndexValue> cache) {
+        SqlQuery<Integer, MvccTestSqlIndexValue> qry;
+
+        qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "true");
+
+        Map<Integer, Integer> vals = new HashMap<>();
+
+        for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : 
cache.query(qry).getAll())
+            assertNull(vals.put(e.getKey(), e.getValue().idxVal1));
+
+        assertEquals(expVals, vals);
+
+        qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "_key >= 0");
+
+        vals = new HashMap<>();
+
+        for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : 
cache.query(qry).getAll())
+            assertNull(vals.put(e.getKey(), e.getValue().idxVal1));
+
+        assertEquals(expVals, vals);
+
+        qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "idxVal1 >= 0");
+
+        vals = new HashMap<>();
+
+        for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : 
cache.query(qry).getAll())
+            assertNull(vals.put(e.getKey(), e.getValue().idxVal1));
+
+        assertEquals(expVals, vals);
+
+        Map<Integer, Set<Integer>> expIdxVals = new HashMap<>();
+
+        for (Map.Entry<Integer, Integer> e : expVals.entrySet()) {
+            qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "_key = ?");
+
+            qry.setArgs(e.getKey());
+
+            List<IgniteCache.Entry<Integer, MvccTestSqlIndexValue>> res = 
cache.query(qry).getAll();
+
+            assertEquals(1, res.size());
+            assertEquals(e.getKey(), res.get(0).getKey());
+            assertEquals(e.getValue(), (Integer)res.get(0).getValue().idxVal1);
+
+            SqlFieldsQuery fieldsQry = new SqlFieldsQuery("select _key, 
idxVal1 from MvccTestSqlIndexValue where _key=?");
+            fieldsQry.setArgs(e.getKey());
+
+            List<List<?>> fieldsRes = cache.query(fieldsQry).getAll();
+
+            assertEquals(1, fieldsRes.size());
+            assertEquals(e.getKey(), fieldsRes.get(0).get(0));
+            assertEquals(e.getValue(), fieldsRes.get(0).get(1));
+
+            Integer val = e.getValue();
+
+            Set<Integer> keys = expIdxVals.get(val);
+
+            if (keys == null)
+                expIdxVals.put(val, keys = new HashSet<>());
+
+            assertTrue(keys.add(e.getKey()));
+        }
+
+        for (Map.Entry<Integer, Set<Integer>> expE : expIdxVals.entrySet()) {
+            qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "idxVal1 = ?");
+            qry.setArgs(expE.getKey());
+
+            vals = new HashMap<>();
+
+            for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : 
cache.query(qry).getAll()) {
+                assertNull(vals.put(e.getKey(), e.getValue().idxVal1));
+
+                assertEquals(expE.getKey(), (Integer)e.getValue().idxVal1);
+
+                assertTrue(expE.getValue().contains(e.getKey()));
+            }
+
+            assertEquals(expE.getValue().size(), vals.size());
+        }
     }
 
     /**

Reply via email to