Repository: ignite Updated Branches: refs/heads/ignite-5937 d381cdec8 -> 0512c072d
ignite-5937 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0512c072 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0512c072 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0512c072 Branch: refs/heads/ignite-5937 Commit: 0512c072dbc68c65da708f312f6301cf93fdeed0 Parents: d381cde Author: sboikov <[email protected]> Authored: Tue Oct 24 16:19:50 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Oct 24 17:32:40 2017 +0300 ---------------------------------------------------------------------- .../cache/mvcc/CacheMvccSqlQueriesTest.java | 389 +++++++++++++++++-- 1 file changed, 359 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0512c072/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java index 9ac7d21..ebe24c9 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java @@ -31,21 +31,27 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteTransactions; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.query.SqlQuery; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.util.lang.GridInClosure3; +import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.transactions.Transaction; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; /** * TODO IGNITE-3478: text/spatial indexes with mvcc. * TODO IGNITE-3478: indexingSpi with mvcc. + * TODO IGNITE-3478: setQueryParallelism with mvcc. * TODO IGNITE-3478: dynamic index create. */ @SuppressWarnings("unchecked") @@ -96,21 +102,29 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { * @throws Exception If failed. */ public void testUpdateSingleValue_SingleNode() throws Exception { - updateSingleValue(true); + updateSingleValue(true, false); + } + + /** + * @throws Exception If failed. + */ + public void testUpdateSingleValue_LocalQuery_SingleNode() throws Exception { + updateSingleValue(true, true); } /** * @throws Exception If failed. */ public void testUpdateSingleValue_ClientServer() throws Exception { - updateSingleValue(false); + updateSingleValue(false, false); } /** * @param singleNode {@code True} for test with single node. + * @param locQry Local query flag. * @throws Exception If failed. */ - private void updateSingleValue(boolean singleNode) throws Exception { + private void updateSingleValue(boolean singleNode, final boolean locQry) throws Exception { final int VALS = 100; final int writers = 4; @@ -182,16 +196,22 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); - SqlFieldsQuery[] qrys = new SqlFieldsQuery[3]; + List<SqlFieldsQuery> fieldsQrys = new ArrayList<>(); + + fieldsQrys.add( + new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue where idxVal1=?").setLocal(locQry)); + + fieldsQrys.add(new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue where idxVal1=? or idxVal1=?").setLocal(locQry)); - qrys[0] = new SqlFieldsQuery( - "select _key, idxVal1 from MvccTestSqlIndexValue where idxVal1=?"); + fieldsQrys.add(new SqlFieldsQuery("select _key, idxVal1 from MvccTestSqlIndexValue where _key=?").setLocal(locQry)); - qrys[1] = new SqlFieldsQuery( - "select _key, idxVal1 from MvccTestSqlIndexValue where idxVal1=? or idxVal1=?"); + List<SqlQuery<Integer, MvccTestSqlIndexValue>> sqlQrys = new ArrayList<>(); - qrys[2] = new SqlFieldsQuery( - "select _key, idxVal1 from MvccTestSqlIndexValue where _key=?"); + sqlQrys.add(new SqlQuery<Integer, MvccTestSqlIndexValue>(MvccTestSqlIndexValue.class, "idxVal1=?").setLocal(locQry)); + + sqlQrys.add(new SqlQuery<Integer, MvccTestSqlIndexValue>(MvccTestSqlIndexValue.class, "idxVal1=? or idxVal1=?").setLocal(locQry)); + + sqlQrys.add(new SqlQuery<Integer, MvccTestSqlIndexValue>(MvccTestSqlIndexValue.class, "_key=?").setLocal(locQry)); while (!stop.get()) { Integer key = rnd.nextInt(VALS); @@ -203,14 +223,35 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { List<List<?>> res; try { - SqlFieldsQuery qry = qrys[qryIdx]; + if (rnd.nextBoolean()) { + SqlFieldsQuery qry = fieldsQrys.get(qryIdx); - if (qryIdx == 1) - qry.setArgs(key, key + INC_BY); - else - qry.setArgs(key); + if (qryIdx == 1) + qry.setArgs(key, key + INC_BY); + else + qry.setArgs(key); - res = cache.cache.query(qry).getAll(); + res = cache.cache.query(qry).getAll(); + } + else { + SqlQuery<Integer, MvccTestSqlIndexValue> qry = sqlQrys.get(qryIdx); + + if (qryIdx == 1) + qry.setArgs(key, key + INC_BY); + else + qry.setArgs(key); + + res = new ArrayList<>(); + + for (IgniteCache.Entry<Integer, MvccTestSqlIndexValue> e : cache.cache.query(qry).getAll()) { + List<Object> row = new ArrayList<>(2); + + row.add(e.getKey()); + row.add(e.getValue().idxVal1); + + res.add(row); + } + } } finally { cache.readUnlock(); @@ -278,6 +319,150 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { init, writer, reader); + + for (Ignite node : G.allGrids()) + checkActiveQueriesCleanup(node); + } + + /** + * @throws Exception If failed. + */ + public void testJoinTransactional_SingleNode() throws Exception { + joinTransactional(true, false); + } + + /** + * @throws Exception If failed. + */ + public void testJoinTransactional_ClientServer() throws Exception { + joinTransactional(false, false); + } + + /** + * @throws Exception If failed. + */ + public void testJoinTransactional_DistributedJoins_ClientServer() throws Exception { + joinTransactional(false, true); + } + + /** + * @param singleNode {@code True} for test with single node. + * @param distributedJoin {@code True} to test distributed joins. + * @throws Exception If failed. + */ + private void joinTransactional(boolean singleNode, boolean distributedJoin) throws Exception { + final int KEYS = 100; + + final int writers = 1; + + final int readers = 1; + + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int cnt = 0; + + while (!stop.get()) { + TestCache<Object, Object> cache = randomCache(caches, rnd); + + IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions(); + + try { + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + Integer key = rnd.nextInt(KEYS); + + JoinTestChildKey childKey = new JoinTestChildKey(rnd.nextInt(KEYS)); + + JoinTestChild child = (JoinTestChild)cache.cache.get(childKey); + + if (child == null) { + Integer parentKey = key; + + child = new JoinTestChild(parentKey); + + cache.cache.put(childKey, child); + + JoinTestParent parent = new JoinTestParent(parentKey); + + cache.cache.put(new JoinTestParentKey(parentKey), parent); + } + else { + cache.cache.remove(childKey); + + cache.cache.remove(new JoinTestParentKey(child.parentId)); + } + + tx.commit(); + } + } + finally { + cache.readUnlock(); + } + } + + info("Writer finished, updates: " + cnt); + } + }; + + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + SqlFieldsQuery qry = new SqlFieldsQuery("select c.parentId, p.id from " + + "JoinTestChild c left outer join JoinTestParent p on (c.parentId = p.id)"); + + while (!stop.get()) { + TestCache<Object, Object> cache = randomCache(caches, rnd); + + List<List<?>> res; + + try { + res = cache.cache.query(qry).getAll(); + } + finally { + cache.readUnlock(); + } + + if (!res.isEmpty()) { + for (List<?> resRow : res) { + Integer parentId = (Integer)resRow.get(1); + + assertNotNull(parentId); + } + } + } + } + }; + + int srvs; + int clients; + + if (singleNode) { + srvs = 1; + clients = 0; + } + else { + srvs = 4; + clients = 2; + } + + readWriteTest( + null, + srvs, + clients, + 0, + DFLT_PARTITION_COUNT, + writers, + readers, + DFLT_TEST_TIME, + new InitIndexing(JoinTestParentKey.class, JoinTestParent.class, + JoinTestChildKey.class, JoinTestChild.class), + null, + writer, + reader); } /** @@ -740,33 +925,69 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { for (int i = 0; i < 10; i++) cache.put(i, new MvccTestSqlIndexValue(i)); - checkSingleResult(cache, new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue"), 9); + sqlQueriesWithMvcc(cache, true); + + sqlQueriesWithMvcc(cache, false); - checkSingleResult(cache, new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue where idxVal1 > 0"), 9); + startGrid(1); - checkSingleResult(cache, new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue where idxVal1 < 5"), 4); + awaitPartitionMapExchange(); - checkSingleResult(cache, new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue"), 0); + sqlQueriesWithMvcc(cache, false); + } + + /** + * @param cache Cache. + * @param loc Local query flag. + */ + private void sqlQueriesWithMvcc(IgniteCache<Integer, MvccTestSqlIndexValue> cache, boolean loc) { + assertEquals(10, + cache.query(new SqlQuery<>(MvccTestSqlIndexValue.class, "true").setLocal(loc)).getAll().size()); + + assertEquals(10, + cache.query(new SqlFieldsQuery("select idxVal1 from MvccTestSqlIndexValue").setLocal(loc)).getAll().size()); + + checkSingleResult(cache, + new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue").setLocal(loc), 9); - checkSingleResult(cache, new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue where idxVal1 < 100"), 0); + checkSingleResult(cache, + new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue where idxVal1 > 0").setLocal(loc), 9); - checkSingleResult(cache, new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue where idxVal1 < 5"), 0); + checkSingleResult(cache, + new SqlFieldsQuery("select max(idxVal1) from MvccTestSqlIndexValue where idxVal1 < 5").setLocal(loc), 4); - checkSingleResult(cache, new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue where idxVal1 > 5"), 6); + checkSingleResult(cache, + new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue").setLocal(loc), 0); - checkSingleResult(cache, new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue"), 10L); + checkSingleResult(cache, + new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue where idxVal1 < 100").setLocal(loc), 0); - checkSingleResult(cache, new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >= 0"), 10L); + checkSingleResult(cache, + new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue where idxVal1 < 5").setLocal(loc), 0); - checkSingleResult(cache, new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >= 0 and idxVal1 < 100"), 10L); + checkSingleResult(cache, + new SqlFieldsQuery("select min(idxVal1) from MvccTestSqlIndexValue where idxVal1 > 5").setLocal(loc), 6); - checkSingleResult(cache, new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >0 and idxVal1 < 5"), 4L); + checkSingleResult(cache, + new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue").setLocal(loc), 10L); - checkSingleResult(cache, new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >= 1"), 9L); + checkSingleResult(cache, + new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >= 0").setLocal(loc), 10L); - checkSingleResult(cache, new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 > 100"), 0L); + checkSingleResult(cache, + new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >= 0 and idxVal1 < 100").setLocal(loc), 10L); - checkSingleResult(cache, new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 = 1"), 1L); + checkSingleResult(cache, + new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >0 and idxVal1 < 5").setLocal(loc), 4L); + + checkSingleResult(cache, + new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 >= 1").setLocal(loc), 9L); + + checkSingleResult(cache, + new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 > 100").setLocal(loc), 0L); + + checkSingleResult(cache, + new SqlFieldsQuery("select count(*) from MvccTestSqlIndexValue where idxVal1 = 1").setLocal(loc), 1L); } /** @@ -1032,6 +1253,114 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { /** * */ + static class JoinTestParentKey implements Serializable { + /** */ + private int key; + + /** + * @param key Key. + */ + public JoinTestParentKey(int key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + JoinTestParentKey that = (JoinTestParentKey)o; + + return key == that.key; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key; + } + } + + /** + * + */ + static class JoinTestParent { + /** */ + @QuerySqlField(index = true) + private int id; + + /** + * @param id ID. + */ + public JoinTestParent(int id) { + this.id = id; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(JoinTestParent.class, this); + } + } + + /** + * + */ + static class JoinTestChildKey implements Serializable { + /** */ + private int key; + + /** + * @param key Key. + */ + public JoinTestChildKey(int key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + JoinTestChildKey that = (JoinTestChildKey)o; + + return key == that.key; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key; + } + } + + /** + * + */ + static class JoinTestChild { + /** */ + @QuerySqlField(index = true) + private int parentId; + + /** + * @param parentId Parent ID. + */ + public JoinTestChild(int parentId) { + this.parentId = parentId; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(JoinTestChild.class, this); + } + } + + /** + * + */ static class MvccTestSqlIndexValue implements Serializable { /** */ @QuerySqlField(index = true)
