Repository: ignite Updated Branches: refs/heads/ignite-5937 4cadfc92d -> 739417afb
ignite-5937 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/739417af Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/739417af Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/739417af Branch: refs/heads/ignite-5937 Commit: 739417afb3cb47f6ada06135ad9ce66c7e1cb0a8 Parents: 4cadfc9 Author: sboikov <[email protected]> Authored: Wed Oct 25 13:34:28 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Oct 25 14:40:36 2017 +0300 ---------------------------------------------------------------------- .../query/h2/database/H2TreeIndex.java | 8 +- .../query/h2/opt/GridH2IndexBase.java | 21 +- .../cache/mvcc/CacheMvccSqlQueriesTest.java | 234 +++++++++++++++++-- 3 files changed, 235 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/739417af/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java index f6ca9e8..63b33af 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java @@ -372,11 +372,13 @@ public class H2TreeIndex extends GridH2IndexBase { @Override protected GridCursor<GridH2Row> doFind0( IgniteTree t, @Nullable SearchRow first, - boolean includeFirst, @Nullable SearchRow last, - IndexingQueryFilter filter) { + IndexingQueryFilter filter, + H2TreeMvccFilterClosure mvccFilter) { try { - GridCursor<GridH2Row> range = t.find(first, last); + assert !cctx.mvccEnabled() || mvccFilter != null; + + GridCursor<GridH2Row> range = ((BPlusTree)t).find(first, last, mvccFilter, null); if (range == null) return EMPTY_CURSOR; http://git-wip-us.apache.org/repos/asf/ignite/blob/739417af/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java index b865e00..96b331a 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.query.h2.database.H2TreeMvccFilterClosure; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse; import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowMessage; @@ -432,7 +433,7 @@ public abstract class GridH2IndexBase extends BaseIndex { // This is the first request containing all the search rows. assert !msg.bounds().isEmpty() : "empty bounds"; - src = new RangeSource(msg.bounds(), msg.segment(), qctx.filter()); + src = new RangeSource(msg.bounds(), msg.segment(), qctx.filter(), qctx.mvccFilter()); } else { // This is request to fetch next portion of data. @@ -1475,20 +1476,28 @@ public abstract class GridH2IndexBase extends BaseIndex { /** */ final IndexingQueryFilter filter; + /** */ + private final H2TreeMvccFilterClosure mvccFilter; + /** Iterator. */ Iterator<GridH2Row> iter = emptyIterator(); /** * @param bounds Bounds. + * @param segment Segment. * @param filter Filter. + * @param mvccFilter Mvcc filter. */ RangeSource( Iterable<GridH2RowRangeBounds> bounds, int segment, - IndexingQueryFilter filter + IndexingQueryFilter filter, + H2TreeMvccFilterClosure mvccFilter ) { this.segment = segment; this.filter = filter; + this.mvccFilter = mvccFilter; + boundsIter = bounds.iterator(); } @@ -1546,7 +1555,7 @@ public abstract class GridH2IndexBase extends BaseIndex { IgniteTree t = treeForRead(segment); - iter = new CursorIteratorWrapper(doFind0(t, first, true, last, filter)); + iter = new CursorIteratorWrapper(doFind0(t, first, last, filter, mvccFilter)); if (!iter.hasNext()) { // We have to return empty range here. @@ -1571,17 +1580,17 @@ public abstract class GridH2IndexBase extends BaseIndex { /** * @param t Tree. * @param first Lower bound. - * @param includeFirst Whether lower bound should be inclusive. * @param last Upper bound always inclusive. * @param filter Filter. + * @param mvccFilter Mvcc filter. * @return Iterator over rows in given range. */ protected GridCursor<GridH2Row> doFind0( IgniteTree t, @Nullable SearchRow first, - boolean includeFirst, @Nullable SearchRow last, - IndexingQueryFilter filter) { + IndexingQueryFilter filter, + H2TreeMvccFilterClosure mvccFilter) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/739417af/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java index ebe24c9..e77a3f1 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccSqlQueriesTest.java @@ -350,12 +350,12 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { * @param distributedJoin {@code True} to test distributed joins. * @throws Exception If failed. */ - private void joinTransactional(boolean singleNode, boolean distributedJoin) throws Exception { + private void joinTransactional(boolean singleNode, final boolean distributedJoin) throws Exception { final int KEYS = 100; - final int writers = 1; + final int writers = 4; - final int readers = 1; + final int readers = 4; GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer = new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { @@ -373,12 +373,12 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { Integer key = rnd.nextInt(KEYS); - JoinTestChildKey childKey = new JoinTestChildKey(rnd.nextInt(KEYS)); + JoinTestChildKey childKey = new JoinTestChildKey(key); JoinTestChild child = (JoinTestChild)cache.cache.get(childKey); if (child == null) { - Integer parentKey = key; + Integer parentKey = distributedJoin ? key + 100 : key; child = new JoinTestChild(parentKey); @@ -396,6 +396,8 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { tx.commit(); } + + cnt++; } finally { cache.readUnlock(); @@ -411,27 +413,51 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { ThreadLocalRandom rnd = ThreadLocalRandom.current(); - SqlFieldsQuery qry = new SqlFieldsQuery("select c.parentId, p.id from " + - "JoinTestChild c left outer join JoinTestParent p on (c.parentId = p.id)"); + List<SqlFieldsQuery> qrys = new ArrayList<>(); + + qrys.add(new SqlFieldsQuery("select c.parentId, p.id from " + + "JoinTestChild c left outer join JoinTestParent p on (c.parentId = p.id)"). + setDistributedJoins(distributedJoin)); + + qrys.add(new SqlFieldsQuery("select c.parentId, p.id from " + + "JoinTestChild c left outer join JoinTestParent p on (c.parentId = p.id) where p.id = 10"). + setDistributedJoins(distributedJoin)); + + qrys.add(new SqlFieldsQuery("select c.parentId, p.id from " + + "JoinTestChild c left outer join JoinTestParent p on (c.parentId = p.id) where p.id != 10"). + setDistributedJoins(distributedJoin)); while (!stop.get()) { TestCache<Object, Object> cache = randomCache(caches, rnd); - List<List<?>> res; - try { - res = cache.cache.query(qry).getAll(); + for (SqlFieldsQuery qry : qrys) { + List<List<?>> res = cache.cache.query(qry).getAll(); + + if (!res.isEmpty()) { + for (List<?> resRow : res) { + Integer parentId = (Integer)resRow.get(1); + + assertNotNull(parentId); + } + } + } } finally { cache.readUnlock(); } + } - if (!res.isEmpty()) { - for (List<?> resRow : res) { - Integer parentId = (Integer)resRow.get(1); + if (idx == 0) { + TestCache<Object, Object> cache = randomCache(caches, rnd); - assertNotNull(parentId); - } + try { + List<List<?>> res = cache.cache.query(qrys.get(0)).getAll(); + + info("Reader finished, result: " + res); + } + finally { + cache.readUnlock(); } } } @@ -468,6 +494,175 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { /** * @throws Exception If failed. */ + public void testJoinTransactional_DistributedJoins_ClientServer2() throws Exception { + final int KEYS = 100; + + final int writers = 1; + + final int readers = 4; + + final int CHILDREN_CNT = 10; + + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> writer = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + int cnt = 0; + + while (!stop.get()) { + TestCache<Object, Object> cache = randomCache(caches, rnd); + + IgniteTransactions txs = cache.cache.unwrap(Ignite.class).transactions(); + + try { + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + Integer key = rnd.nextInt(KEYS); + + JoinTestParentKey parentKey = new JoinTestParentKey(key); + + JoinTestParent parent = (JoinTestParent)cache.cache.get(parentKey); + + if (parent == null) { + for (int i = 0; i < CHILDREN_CNT; i++) + cache.cache.put(new JoinTestChildKey(key * 10_000 + i), new JoinTestChild(key)); + + cache.cache.put(parentKey, new JoinTestParent(key)); + } + else { + for (int i = 0; i < CHILDREN_CNT; i++) + cache.cache.remove(new JoinTestChildKey(key * 10_000 + i)); + + cache.cache.remove(parentKey); + } + + tx.commit(); + } + + cnt++; + } + finally { + cache.readUnlock(); + } + } + + info("Writer finished, updates: " + cnt); + } + }; + + GridInClosure3<Integer, List<TestCache>, AtomicBoolean> reader = + new GridInClosure3<Integer, List<TestCache>, AtomicBoolean>() { + @Override public void apply(Integer idx, List<TestCache> caches, AtomicBoolean stop) { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + SqlFieldsQuery qry = new SqlFieldsQuery("select c.parentId, p.id from " + + "JoinTestChild c left outer join JoinTestParent p on (c.parentId = p.id) where p.id=?"). + setDistributedJoins(true); + + int cnt = 0; + + while (!stop.get()) { + TestCache<Object, Object> cache = randomCache(caches, rnd); + + qry.setArgs(rnd.nextInt(KEYS)); + + try { + List<List<?>> res = cache.cache.query(qry).getAll(); + + if (!res.isEmpty()) + assertEquals(CHILDREN_CNT, res.size()); + + cnt++; + } + finally { + cache.readUnlock(); + } + } + + info("Reader finished, read count: " + cnt); + } + }; + + readWriteTest( + null, + 4, + 2, + 0, + DFLT_PARTITION_COUNT, + writers, + readers, + DFLT_TEST_TIME, + new InitIndexing(JoinTestParentKey.class, JoinTestParent.class, + JoinTestChildKey.class, JoinTestChild.class), + null, + writer, + reader); + } + + /** + * @throws Exception If failed. + */ + public void testDistributedJoinSimple() throws Exception { + startGridsMultiThreaded(4); + + Ignite srv0 = ignite(0); + + int[] backups = {0, 1, 2}; + + for (int b : backups) { + IgniteCache<Object, Object> cache = srv0.createCache( + cacheConfiguration(PARTITIONED, FULL_SYNC, b, DFLT_PARTITION_COUNT). + setIndexedTypes(JoinTestParentKey.class, JoinTestParent.class, JoinTestChildKey.class, JoinTestChild.class)); + + int cntr = 0; + + int expCnt = 0; + + for (int i = 0; i < 10; i++) { + JoinTestParentKey parentKey = new JoinTestParentKey(i); + + cache.put(parentKey, new JoinTestParent(i)); + + for (int c = 0; c < i; c++) { + JoinTestChildKey childKey = new JoinTestChildKey(cntr++); + + cache.put(childKey, new JoinTestChild(i)); + + expCnt++; + } + } + + SqlFieldsQuery qry = new SqlFieldsQuery("select c.parentId, p.id from " + + "JoinTestChild c join JoinTestParent p on (c.parentId = p.id)"). + setDistributedJoins(true); + + Map<Integer, Integer> resMap = new HashMap<>(); + + List<List<?>> res = cache.query(qry).getAll(); + + assertEquals(expCnt, res.size()); + + for (List<?> resRow : res) { + Integer parentId = (Integer)resRow.get(0); + + Integer cnt = resMap.get(parentId); + + if (cnt == null) + resMap.put(parentId, 1); + else + resMap.put(parentId, cnt + 1); + } + + for (int i = 1; i < 10; i++) + assertEquals(i, (Object)resMap.get(i)); + + srv0.destroyCache(cache.getName()); + } + } + + /** + * @throws Exception If failed. + */ public void testCacheRecreate() throws Exception { cacheRecreate(new InitIndexing(Integer.class, MvccTestAccount.class)); } @@ -1260,7 +1455,7 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { /** * @param key Key. */ - public JoinTestParentKey(int key) { + JoinTestParentKey(int key) { this.key = key; } @@ -1294,7 +1489,7 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { /** * @param id ID. */ - public JoinTestParent(int id) { + JoinTestParent(int id) { this.id = id; } @@ -1309,12 +1504,13 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { */ static class JoinTestChildKey implements Serializable { /** */ + @QuerySqlField(index = true) private int key; /** * @param key Key. */ - public JoinTestChildKey(int key) { + JoinTestChildKey(int key) { this.key = key; } @@ -1348,7 +1544,7 @@ public class CacheMvccSqlQueriesTest extends CacheMvccAbstractTest { /** * @param parentId Parent ID. */ - public JoinTestChild(int parentId) { + JoinTestChild(int parentId) { this.parentId = parentId; }
