Repository: ignite Updated Branches: refs/heads/ignite-5937 c28e02d02 -> 93bb0bc7a
ignite-5937 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93bb0bc7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93bb0bc7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93bb0bc7 Branch: refs/heads/ignite-5937 Commit: 93bb0bc7adf0cae8dd9dc5168bd64236950d954d Parents: c28e02d Author: sboikov <[email protected]> Authored: Wed Oct 18 15:05:42 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Oct 18 15:59:16 2017 +0300 ---------------------------------------------------------------------- .../cache/IgniteCacheOffheapManagerImpl.java | 15 +- .../cache/mvcc/CacheCoordinatorsProcessor.java | 4 +- .../cache/persistence/tree/BPlusTree.java | 8 +- .../datastreamer/DataStreamerImpl.java | 2 +- .../processors/query/GridQueryProcessor.java | 2 +- .../cache/mvcc/CacheMvccAbstractTest.java | 4 +- .../h2/twostep/GridMergeIndexIterator.java | 16 +- .../h2/twostep/GridReduceQueryExecutor.java | 26 ++- .../cache/mvcc/CacheMvccSqlQueriesTest.java | 230 ++++++++++++++++++- 9 files changed, 269 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/93bb0bc7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index bea3ed7..5d0d51d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -39,9 +39,7 @@ import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion; -import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionResponse; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersionWithoutTxs; import org.apache.ignite.internal.processors.cache.mvcc.MvccCounter; import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList; @@ -89,6 +87,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX; import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; +import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.MVCC_START_CNTR; import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.unmaskCoordinatorVersion; import static org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsProcessor.versionForRemovedValue; @@ -1420,12 +1419,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager // TODO IGNITE-3478: null is passed for loaded from store, need handle better. if (mvccVer == null) { - mvccVer = new MvccCoordinatorVersionWithoutTxs(1L, CacheCoordinatorsProcessor.START_VER, 0L); + mvccVer = new MvccCoordinatorVersionWithoutTxs(1L, MVCC_START_CNTR, 0L); newVal = true; } else - assert val != null || CacheCoordinatorsProcessor.versionForRemovedValue(mvccVer.coordinatorVersion()); + assert val != null || versionForRemovedValue(mvccVer.coordinatorVersion()); if (val != null) { val.valueBytes(coCtx); @@ -1477,8 +1476,12 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager assert !old; - if (val != null) + if (val != null) { incrementSize(cctx.cacheId()); + + if (cctx.queries().enabled()) + cctx.queries().store(updateRow, mvccVer, null); + } } finally { busyLock.leaveBusy(); @@ -2026,7 +2029,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (curKey != null && row.key().equals(curKey)) continue; - if (CacheCoordinatorsProcessor.versionForRemovedValue(rowCrdVerMasked)) { + if (versionForRemovedValue(rowCrdVerMasked)) { curKey = row.key(); continue; http://git-wip-us.apache.org/repos/asf/ignite/blob/93bb0bc7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java index 5e76fe1..06d617c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java @@ -78,7 +78,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { public static final long COUNTER_NA = 0L; /** */ - public static final long START_VER = 1L; + public static final long MVCC_START_CNTR = 1L; /** */ private static final boolean STAT_CNTRS = false; @@ -99,7 +99,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { private volatile MvccCoordinator curCrd; /** */ - private final AtomicLong mvccCntr = new AtomicLong(START_VER); + private final AtomicLong mvccCntr = new AtomicLong(MVCC_START_CNTR); /** */ private final GridAtomicLong committedCntr = new GridAtomicLong(1L); http://git-wip-us.apache.org/repos/asf/ignite/blob/93bb0bc7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java index 9951a76..e69284d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java @@ -4800,8 +4800,10 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements int resCnt = 0; for (int i = 0; i < cnt; i++) { - if (c == null || c.apply(BPlusTree.this, io, pageAddr, startIdx + i)) { - T r = getRow(io, pageAddr, startIdx + i, x); + int itemIdx = startIdx + i; + + if (c == null || c.apply(BPlusTree.this, io, pageAddr, itemIdx)) { + T r = getRow(io, pageAddr, itemIdx, x); rows = GridArrays.set(rows, resCnt++, r); } @@ -4809,7 +4811,7 @@ public abstract class BPlusTree<L, T extends L> extends DataStructure implements GridArrays.clearTail(rows, resCnt); - return true; + return resCnt > 0; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/93bb0bc7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java index e6300a9..dab2ec0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java @@ -134,7 +134,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed /** Version which is less then any version generated on coordinator. */ private static final MvccCoordinatorVersion ISOLATED_STREAMER_MVCC_VER = - new MvccCoordinatorVersionWithoutTxs(1L, CacheCoordinatorsProcessor.START_VER, 0L); + new MvccCoordinatorVersionWithoutTxs(1L, CacheCoordinatorsProcessor.MVCC_START_CNTR, 0L); /** Cache receiver. */ private StreamReceiver<K, V> rcvr = ISOLATED_UPDATER; http://git-wip-us.apache.org/repos/asf/ignite/blob/93bb0bc7/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 74157d8..c70b73d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -2324,7 +2324,7 @@ public class GridQueryProcessor extends GridProcessorAdapter { public void remove(GridCacheContext cctx, CacheDataRow val, @Nullable MvccCoordinatorVersion newVer) throws IgniteCheckedException { assert val != null; - assert !cctx.mvccEnabled() || newVer == null; + assert cctx.mvccEnabled() || newVer == null; if (log.isDebugEnabled()) log.debug("Remove [cacheName=" + cctx.name() + ", key=" + val.key()+ ", val=" + val.value() + "]"); http://git-wip-us.apache.org/repos/asf/ignite/blob/93bb0bc7/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java index 999144f..3078655 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccAbstractTest.java @@ -378,7 +378,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { for (List<?> row : cache.cache.query(qry)) { Integer id = (Integer)row.get(0); - Integer val = (Integer)row.get(0); + Integer val = (Integer)row.get(1); MvccTestAccount old = accounts.put(id, new MvccTestAccount(val, 1)); @@ -713,7 +713,7 @@ public abstract class CacheMvccAbstractTest extends GridCommonAbstractTest { * @param node Node. * @throws Exception If failed. */ - final void checkActiveQueriesCleanup(Ignite node) throws Exception { + protected final void checkActiveQueriesCleanup(Ignite node) throws Exception { final CacheCoordinatorsProcessor crd = ((IgniteKernal)node).context().cache().context().coordinators(); assertTrue("Active queries not cleared: " + node.name(), GridTestUtils.waitForCondition( http://git-wip-us.apache.org/repos/asf/ignite/blob/93bb0bc7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java index 1c0efb3..4518d14 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexIterator.java @@ -25,8 +25,10 @@ import java.util.NoSuchElementException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; import org.h2.index.Cursor; import org.h2.result.Row; +import org.jetbrains.annotations.Nullable; /** * Iterator that transparently and sequentially traverses a bunch of {@link GridMergeIndex} objects. @@ -59,6 +61,9 @@ class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable { /** Whether remote resources were released. */ private boolean released; + /** */ + private MvccQueryTracker mvccTracker; + /** * Constructor. * @@ -69,14 +74,19 @@ class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable { * @param distributedJoins Distributed joins. * @throws IgniteCheckedException if failed. */ - GridMergeIndexIterator(GridReduceQueryExecutor rdcExec, Collection<ClusterNode> nodes, ReduceQueryRun run, - long qryReqId, boolean distributedJoins) + GridMergeIndexIterator(GridReduceQueryExecutor rdcExec, + Collection<ClusterNode> nodes, + ReduceQueryRun run, + long qryReqId, + boolean distributedJoins, + @Nullable MvccQueryTracker mvccTracker) throws IgniteCheckedException { this.rdcExec = rdcExec; this.nodes = nodes; this.run = run; this.qryReqId = qryReqId; this.distributedJoins = distributedJoins; + this.mvccTracker = mvccTracker; this.idxIter = run.indexes().iterator(); @@ -155,7 +165,7 @@ class GridMergeIndexIterator implements Iterator<List<?>>, AutoCloseable { private void releaseIfNeeded() { if (!released) { try { - rdcExec.releaseRemoteResources(nodes, run, qryReqId, distributedJoins); + rdcExec.releaseRemoteResources(nodes, run, qryReqId, distributedJoins, mvccTracker); } finally { released = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/93bb0bc7/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java index debba5e..80b1970 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java @@ -566,7 +566,7 @@ public class GridReduceQueryExecutor { List<Integer> cacheIds = qry.cacheIds(); - MvccCoordinatorVersion mvccVer = null; + MvccQueryTracker mvccTracker = null; // TODO IGNITE-3478. if (qry.mvccEnabled()) { @@ -574,7 +574,7 @@ public class GridReduceQueryExecutor { final GridFutureAdapter<Void> fut = new GridFutureAdapter<>(); - MvccQueryTracker mvccTracker = new MvccQueryTracker(cacheContext(cacheIds.get(0)), true, + mvccTracker = new MvccQueryTracker(cacheContext(cacheIds.get(0)), true, new IgniteBiInClosure<AffinityTopologyVersion, IgniteCheckedException>() { @Override public void apply(AffinityTopologyVersion topVer, IgniteCheckedException e) { fut.onDone(null, e); @@ -585,8 +585,6 @@ public class GridReduceQueryExecutor { try { fut.get(); - - mvccVer = mvccTracker.mvccVersion(); } catch (IgniteCheckedException e) { throw new CacheException(e); @@ -759,8 +757,10 @@ public class GridReduceQueryExecutor { .parameters(params) .flags(flags) .timeout(timeoutMillis) - .schemaName(schemaName) - .mvccVersion(mvccVer); + .schemaName(schemaName); + + if (mvccTracker != null) + req.mvccVersion(mvccTracker.mvccVersion()); if (send(nodes, req, parts == null ? null : new ExplicitPartitionsSpecializer(qryMap), false)) { awaitAllReplies(r, nodes, cancel); @@ -795,7 +795,12 @@ public class GridReduceQueryExecutor { if (!retry) { if (skipMergeTbl) { - resIter = new GridMergeIndexIterator(this, finalNodes, r, qryReqId, qry.distributedJoins()); + resIter = new GridMergeIndexIterator(this, + finalNodes, + r, + qryReqId, + qry.distributedJoins(), + mvccTracker); release = false; } @@ -865,7 +870,7 @@ public class GridReduceQueryExecutor { } finally { if (release) { - releaseRemoteResources(finalNodes, r, qryReqId, qry.distributedJoins()); + releaseRemoteResources(finalNodes, r, qryReqId, qry.distributedJoins(), mvccTracker); if (!skipMergeTbl) { for (int i = 0, mapQrys = qry.mapQueries().size(); i < mapQrys; i++) @@ -1060,7 +1065,10 @@ public class GridReduceQueryExecutor { * @param distributedJoins Distributed join flag. */ public void releaseRemoteResources(Collection<ClusterNode> nodes, ReduceQueryRun r, long qryReqId, - boolean distributedJoins) { + boolean distributedJoins, MvccQueryTracker mvccTracker) { + if (mvccTracker != null) + mvccTracker.onQueryDone(); + // For distributedJoins need always send cancel request to cleanup resources. if (distributedJoins) send(nodes, new GridQueryCancelRequest(qryReqId), null, false); http://git-wip-us.apache.org/repos/asf/ignite/blob/93bb0bc7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java index 66ae606..c14fe65 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java @@ -18,9 +18,15 @@ package org.apache.ignite.internal.processors.cache.mvcc; import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; @@ -33,6 +39,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; /** * TODO IGNITE-3478: text/spatial indexes with mvcc. * TODO IGNITE-3478: dynamic index create. + * TODO IGNITE-3478: tests with/without inline. */ @SuppressWarnings("unchecked") public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { @@ -42,7 +49,7 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { public void testAccountsTxSql_SingleNode_SinglePartition() throws Exception { accountsTxReadAll(1, 0, 0, 1, new IgniteInClosure<CacheConfiguration>() { @Override public void apply(CacheConfiguration ccfg) { - ccfg.setIndexedTypes(Integer.class, MvccTestAccount.class); + ccfg.setIndexedTypes(Integer.class, MvccTestAccount.class).setSqlIndexMaxInlineSize(0); } }, false, ReadMode.SQL_ALL); } @@ -50,35 +57,236 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { /** * @throws Exception If failed. */ + public void testAccountsTxSql_WithRemoves_SingleNode_SinglePartition() throws Exception { + accountsTxReadAll(1, 0, 0, 1, new IgniteInClosure<CacheConfiguration>() { + @Override public void apply(CacheConfiguration ccfg) { + ccfg.setIndexedTypes(Integer.class, MvccTestAccount.class).setSqlIndexMaxInlineSize(0); + } + }, true, ReadMode.SQL_ALL); + } + + /** + * @throws Exception If failed. + */ + public void testAccountsTxSql_SingleNode() throws Exception { + accountsTxReadAll(1, 0, 0, 64, new IgniteInClosure<CacheConfiguration>() { + @Override public void apply(CacheConfiguration ccfg) { + ccfg.setIndexedTypes(Integer.class, MvccTestAccount.class).setSqlIndexMaxInlineSize(0); + } + }, false, ReadMode.SQL_ALL); + } + + /** + * @throws Exception If failed. + */ + public void testAccountsTxSql_WithRemoves_SingleNode() throws Exception { + accountsTxReadAll(1, 0, 0, 64, new IgniteInClosure<CacheConfiguration>() { + @Override public void apply(CacheConfiguration ccfg) { + ccfg.setIndexedTypes(Integer.class, MvccTestAccount.class).setSqlIndexMaxInlineSize(0); + } + }, true, ReadMode.SQL_ALL); + } + + /** + * @throws Exception If failed. + */ public void testSqlSimple() throws Exception { Ignite srv0 = startGrid(0); IgniteCache<Integer, MvccTestSqlIndexValue> cache = (IgniteCache)srv0.createCache( cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT). - setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class). - setSqlIndexMaxInlineSize(0)); + setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class). + setSqlIndexMaxInlineSize(0)); + + Map<Integer, Integer> expVals = new HashMap<>(); + + checkValues(expVals, cache); cache.put(1, new MvccTestSqlIndexValue(1)); + expVals.put(1, 1); + + checkValues(expVals, cache); + cache.put(1, new MvccTestSqlIndexValue(2)); + expVals.put(1, 2); + + checkValues(expVals, cache); cache.put(2, new MvccTestSqlIndexValue(1)); + expVals.put(2, 1); cache.put(3, new MvccTestSqlIndexValue(1)); + expVals.put(3, 1); cache.put(4, new MvccTestSqlIndexValue(1)); + expVals.put(4, 1); - SqlQuery<Integer, MvccTestSqlIndexValue> qry = - new SqlQuery<>(MvccTestSqlIndexValue.class, "_key = 1"); + checkValues(expVals, cache); - List<IgniteCache.Entry<Integer, MvccTestSqlIndexValue>> res; + cache.remove(1); + expVals.remove(1); - res = cache.query(qry).getAll(); + checkValues(expVals, cache); - assertEquals(1, res.size()); + checkNoValue(1, cache); - cache.remove(1); + cache.put(1, new MvccTestSqlIndexValue(10)); + expVals.put(1, 10); + + checkValues(expVals, cache); + + checkActiveQueriesCleanup(srv0); + } + + /** + * @throws Exception If failed. + */ + public void testSqlSimplePutRemoveRandom() throws Exception { + Ignite srv0 = startGrid(0); + + IgniteCache<Integer, MvccTestSqlIndexValue> cache = (IgniteCache) srv0.createCache( + cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT). + setIndexedTypes(Integer.class, MvccTestSqlIndexValue.class). + setSqlIndexMaxInlineSize(0)); + + Map<Integer, Integer> expVals = new HashMap<>(); + + final int KEYS = 100; + final int VALS = 10; + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + long stopTime = System.currentTimeMillis() + 10_000; + + for (int i = 0; i < 100_000; i++) { + Integer key = rnd.nextInt(KEYS); + + if (rnd.nextInt(5) == 0) { + cache.remove(key); + + expVals.remove(key); + } + else { + Integer val = rnd.nextInt(VALS); + + cache.put(key, new MvccTestSqlIndexValue(val)); + + expVals.put(key, val); + } - res = cache.query(qry).getAll(); + checkValues(expVals, cache); - assertEquals(0, res.size()); + if (System.currentTimeMillis() > stopTime) { + info("Stop test, iteration: " + i); + + break; + } + } + + for (int i = 0; i < KEYS; i++) { + if (!expVals.containsKey(i)) + checkNoValue(i, cache); + } + + checkActiveQueriesCleanup(srv0); + } + + /** + * @param key Key. + * @param cache Cache. + */ + private void checkNoValue(Object key, IgniteCache cache) { + SqlQuery<Integer, MvccTestSqlIndexValue> qry; + + qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "_key = ?"); + + qry.setArgs(key); + + List<IgniteCache.Entry<Integer, MvccTestSqlIndexValue>> res = cache.query(qry).getAll(); + + assertTrue(res.isEmpty()); + } + + /** + * @param expVals Expected values. + * @param cache Cache. + */ + private void checkValues(Map<Integer, Integer> expVals, IgniteCache<Integer, MvccTestSqlIndexValue> cache) { + SqlQuery<Integer, MvccTestSqlIndexValue> qry; + + qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "true"); + + Map<Integer, Integer> vals = new HashMap<>(); + + for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.query(qry).getAll()) + assertNull(vals.put(e.getKey(), e.getValue().idxVal1)); + + assertEquals(expVals, vals); + + qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "_key >= 0"); + + vals = new HashMap<>(); + + for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.query(qry).getAll()) + assertNull(vals.put(e.getKey(), e.getValue().idxVal1)); + + assertEquals(expVals, vals); + + qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "idxVal1 >= 0"); + + vals = new HashMap<>(); + + for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.query(qry).getAll()) + assertNull(vals.put(e.getKey(), e.getValue().idxVal1)); + + assertEquals(expVals, vals); + + Map<Integer, Set<Integer>> expIdxVals = new HashMap<>(); + + for (Map.Entry<Integer, Integer> e : expVals.entrySet()) { + qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "_key = ?"); + + qry.setArgs(e.getKey()); + + List<IgniteCache.Entry<Integer, MvccTestSqlIndexValue>> res = cache.query(qry).getAll(); + + assertEquals(1, res.size()); + assertEquals(e.getKey(), res.get(0).getKey()); + assertEquals(e.getValue(), (Integer)res.get(0).getValue().idxVal1); + + SqlFieldsQuery fieldsQry = new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue where _key=?"); + fieldsQry.setArgs(e.getKey()); + + List<List<?>> fieldsRes = cache.query(fieldsQry).getAll(); + + assertEquals(1, fieldsRes.size()); + assertEquals(e.getKey(), fieldsRes.get(0).get(0)); + assertEquals(e.getValue(), fieldsRes.get(0).get(1)); + + Integer val = e.getValue(); + + Set<Integer> keys = expIdxVals.get(val); + + if (keys == null) + expIdxVals.put(val, keys = new HashSet<>()); + + assertTrue(keys.add(e.getKey())); + } + + for (Map.Entry<Integer, Set<Integer>> expE : expIdxVals.entrySet()) { + qry = new SqlQuery<>(MvccTestSqlIndexValue.class, "idxVal1 = ?"); + qry.setArgs(expE.getKey()); + + vals = new HashMap<>(); + + for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.query(qry).getAll()) { + assertNull(vals.put(e.getKey(), e.getValue().idxVal1)); + + assertEquals(expE.getKey(), (Integer)e.getValue().idxVal1); + + assertTrue(expE.getValue().contains(e.getKey())); + } + + assertEquals(expE.getValue().size(), vals.size()); + } } /**
