Repository: ignite Updated Branches: refs/heads/ignite-1232 75e97dc3a -> 87464e230
ignite-1232 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/87464e23 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/87464e23 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/87464e23 Branch: refs/heads/ignite-1232 Commit: 87464e230ebcb39bc32dad85f8c35bc0b08cb93d Parents: 75e97dc Author: sboikov <[email protected]> Authored: Thu Jul 14 09:34:58 2016 +0300 Committer: sboikov <[email protected]> Committed: Thu Jul 14 13:16:57 2016 +0300 ---------------------------------------------------------------------- .../query/h2/opt/GridH2SpatialIndex.java | 2 +- .../query/h2/opt/GridH2CollocationModel.java | 43 +++-- .../query/h2/opt/GridH2IndexBase.java | 9 +- .../query/h2/opt/GridH2TreeIndex.java | 4 +- .../query/h2/sql/GridSqlQueryParser.java | 21 ++- .../query/h2/sql/GridSqlQuerySplitter.java | 6 +- .../IgniteCacheCrossCacheJoinRandomTest.java | 25 ++- ...ributedJoinPartitionedAndReplicatedTest.java | 84 +++++++-- ...PartitionedAndReplicatedCollocationTest.java | 14 +- .../query/IgniteSqlSplitterSelfTest.java | 183 ++++++++++++++----- 10 files changed, 279 insertions(+), 112 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java ---------------------------------------------------------------------- diff --git a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java index 8c7ce1f..a64c75b 100644 --- a/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java +++ b/modules/geospatial/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2SpatialIndex.java @@ -288,7 +288,7 @@ public class GridH2SpatialIndex extends GridH2IndexBase implements SpatialIndex } while (i.hasNext()); - return filter(rows.iterator(), threadLocalFilter(getTable(), filter)); + return filter(rows.iterator(), threadLocalFilter()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java index d8757a0..d93cab4 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2CollocationModel.java @@ -226,7 +226,7 @@ public final class GridH2CollocationModel { // We are at table instance. GridH2Table tbl = (GridH2Table)upper.childFilters[filter].getTable(); - // Backup filter is used for REPLICATED cache if this is first cache in query. + // Only partitioned tables will do distributed joins. if (!tbl.isPartitioned()) { type = Type.REPLICATED; multiplier = MULTIPLIER_COLLOCATED; @@ -239,34 +239,33 @@ public final class GridH2CollocationModel { // to all the affinity nodes the "base" does not need to get remote results. if (!upper.findPartitionedTableBefore(filter)) { type = Type.PARTITIONED_COLLOCATED; - multiplier = upper.previousReplicated(filter) ? MULTIPLIER_REPLICATED_NOT_LAST : MULTIPLIER_COLLOCATED; - - return; + multiplier = MULTIPLIER_COLLOCATED; } + else { + // It is enough to make sure that our previous join by affinity key is collocated, then we are + // collocated. If we at least have affinity key condition, then we do unicast which is cheaper. + switch (upper.joinedWithCollocated(filter)) { + case JOINED_WITH_COLLOCATED: + type = Type.PARTITIONED_COLLOCATED; + multiplier = MULTIPLIER_COLLOCATED; - // It is enough to make sure that our previous join by affinity key is collocated, then we are - // collocated. If we at least have affinity key condition, then we do unicast which is cheaper. - switch (upper.joinedWithCollocated(filter)) { - case JOINED_WITH_COLLOCATED: - type = Type.PARTITIONED_COLLOCATED; - multiplier = MULTIPLIER_COLLOCATED; - - break; + break; - case HAS_AFFINITY_CONDITION: - type = Type.PARTITIONED_NOT_COLLOCATED; - multiplier = MULTIPLIER_UNICAST; + case HAS_AFFINITY_CONDITION: + type = Type.PARTITIONED_NOT_COLLOCATED; + multiplier = MULTIPLIER_UNICAST; - break; + break; - case NONE: - type = Type.PARTITIONED_NOT_COLLOCATED; - multiplier = MULTIPLIER_BROADCAST; + case NONE: + type = Type.PARTITIONED_NOT_COLLOCATED; + multiplier = MULTIPLIER_BROADCAST; - break; + break; - default: - throw new IllegalStateException(); + default: + throw new IllegalStateException(); + } } if (upper.previousReplicated(filter)) http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java index 985d4ba..3dd23b4 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2IndexBase.java @@ -205,17 +205,12 @@ public abstract class GridH2IndexBase extends BaseIndex { } /** - * @param tbl Table. - * @param tblFilter Table filter. * @return Filter for currently running query or {@code null} if none. */ - protected static IndexingQueryFilter threadLocalFilter(GridH2Table tbl, TableFilter tblFilter) { + protected static IndexingQueryFilter threadLocalFilter() { GridH2QueryContext qctx = GridH2QueryContext.get(); - if (qctx != null) - return qctx.filter(); - - return null; + return qctx != null ? qctx.filter() : null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java index 5dfc8e6..d34eec8 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java @@ -413,7 +413,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS /** {@inheritDoc} */ @Override public long getRowCount(@Nullable Session ses) { - IndexingQueryFilter f = threadLocalFilter(getTable(), null); + IndexingQueryFilter f = threadLocalFilter(); // Fast path if we don't need to perform any filtering. if (f == null || f.forSpace((getTable()).spaceName()) == null) @@ -551,7 +551,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS TableFilter filter) { ConcurrentNavigableMap<GridSearchRowPointer, GridH2Row> t = treeForRead(); - return doFind0(t, first, includeFirst, last, threadLocalFilter(getTable(), filter)); + return doFind0(t, first, includeFirst, last, threadLocalFilter()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java index 9fb82e1..30226c8 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQueryParser.java @@ -50,6 +50,8 @@ import org.h2.expression.Parameter; import org.h2.expression.Subquery; import org.h2.expression.TableFunction; import org.h2.expression.ValueExpression; +import org.h2.index.Index; +import org.h2.index.ViewIndex; import org.h2.jdbc.JdbcPreparedStatement; import org.h2.result.SortOrder; import org.h2.table.Column; @@ -254,7 +256,11 @@ public class GridSqlQueryParser { else if (tbl instanceof TableView) { Query qry = VIEW_QUERY.get((TableView)tbl); - res = new GridSqlSubquery(parse(qry)); + Index idx = filter.getIndex(); + + Query idxQry = idx instanceof ViewIndex ? ((ViewIndex)idx).getQuery() : null; + + res = new GridSqlSubquery(parse(qry, idxQry)); } else if (tbl instanceof FunctionTable) res = parseExpression(FUNC_EXPR.get((FunctionTable)tbl), false); @@ -281,7 +287,7 @@ public class GridSqlQueryParser { /** * @param select Select. */ - public GridSqlSelect parse(Select select) { + public GridSqlSelect parse(Select select, @Nullable Query idxQry) { GridSqlSelect res = (GridSqlSelect)h2ObjToGridObj.get(select); if (res != null) @@ -300,6 +306,9 @@ public class GridSqlQueryParser { TableFilter filter = select.getTopTableFilter(); + if (idxQry instanceof Select) + filter = ((Select)idxQry).getTopTableFilter(); + do { assert0(filter != null, select); assert0(filter.getNestedJoin() == null, select); @@ -374,14 +383,18 @@ public class GridSqlQueryParser { throw new CacheException("Unsupported query: " + qry); } + public GridSqlQuery parse(Prepared qry) { + return parse(qry, null); + } + /** * @param qry Select. */ - public GridSqlQuery parse(Prepared qry) { + public GridSqlQuery parse(Prepared qry, @Nullable Query idxQry) { assert qry != null; if (qry instanceof Select) - return parse((Select)qry); + return parse((Select)qry, idxQry); if (qry instanceof SelectUnion) return parse((SelectUnion)qry); http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java index 0c93838..f37277d 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java @@ -145,14 +145,14 @@ public class GridSqlQuerySplitter { /** * @param stmt Prepared statement. * @param params Parameters. - * @param collocatedGroupBy Whether the query has collocated GROUP BY keys. + * @param collocatedGrpBy Whether the query has collocated GROUP BY keys. * @param distributedJoins If distributed joins enabled. * @return Two step query. */ public static GridCacheTwoStepQuery split( JdbcPreparedStatement stmt, Object[] params, - final boolean collocatedGroupBy, + final boolean collocatedGrpBy, final boolean distributedJoins ) { if (params == null) @@ -175,7 +175,7 @@ public class GridSqlQuerySplitter { // nullifying or updating things, have to make sure that we will not need them in the original form later. final GridSqlSelect mapQry = wrapUnion(qry); - GridCacheSqlQuery rdc = split(res, 0, mapQry, params, collocatedGroupBy); + GridCacheSqlQuery rdc = split(res, 0, mapQry, params, collocatedGrpBy); res.reduceQuery(rdc); http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java index 3b9d85e..1c48e41 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheCrossCacheJoinRandomTest.java @@ -74,7 +74,9 @@ public class IgniteCacheCrossCacheJoinRandomTest extends AbstractH2CompareQueryT /** */ private static final List<T2<CacheMode, Integer>> MODES_1 = F.asList( new T2<>(REPLICATED, 0), - new T2<>(PARTITIONED, 0)); + new T2<>(PARTITIONED, 0), + new T2<>(PARTITIONED, 1), + new T2<>(PARTITIONED, 2)); /** */ private static final List<T2<CacheMode, Integer>> MODES_2 = F.asList( @@ -281,11 +283,16 @@ public class IgniteCacheCrossCacheJoinRandomTest extends AbstractH2CompareQueryT try { IgniteCache cache = null; + boolean hasReplicated = false; + for (int i = 0; i < CACHES; i++) { CacheConfiguration ccfg = ccfgs.get(i); IgniteCache cache0 = client.createCache(ccfg); + if (ccfg.getCacheMode() == REPLICATED) + hasReplicated = true; + if (cache == null && ccfg.getCacheMode() == PARTITIONED) cache = cache0; @@ -303,9 +310,13 @@ public class IgniteCacheCrossCacheJoinRandomTest extends AbstractH2CompareQueryT Object[] args = {}; - compareQueryRes0(cache, createQuery(CACHES, false, null), distributedJoin, true, args, Ordering.RANDOM); + compareQueryRes0(cache, createQuery(CACHES, false, null), distributedJoin, false, args, Ordering.RANDOM); - compareQueryRes0(cache, createQuery(CACHES, true, null), distributedJoin, true, args, Ordering.RANDOM); + if (!hasReplicated) { + compareQueryRes0(cache, createQuery(CACHES, false, null), distributedJoin, true, args, Ordering.RANDOM); + + compareQueryRes0(cache, createQuery(CACHES, true, null), distributedJoin, true, args, Ordering.RANDOM); + } Map<Integer, Integer> data = cachesData.get(CACHES - 1); @@ -314,9 +325,13 @@ public class IgniteCacheCrossCacheJoinRandomTest extends AbstractH2CompareQueryT int cnt = 0; for (Integer objId : data.keySet()) { - compareQueryRes0(cache, createQuery(CACHES, false, objId), distributedJoin, true, args, Ordering.RANDOM); + compareQueryRes0(cache, createQuery(CACHES, false, objId), distributedJoin, false, args, Ordering.RANDOM); + + if (!hasReplicated) { + compareQueryRes0(cache, createQuery(CACHES, false, objId), distributedJoin, true, args, Ordering.RANDOM); - compareQueryRes0(cache, createQuery(CACHES, true, objId), distributedJoin, true, args, Ordering.RANDOM); + compareQueryRes0(cache, createQuery(CACHES, true, objId), distributedJoin, true, args, Ordering.RANDOM); + } if (cnt++ == QRY_CNT) break; http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java index 41014ea..2e4967c 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheDistributedJoinPartitionedAndReplicatedTest.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cache.QueryIndex; import org.apache.ignite.cache.affinity.Affinity; @@ -40,6 +41,7 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; @@ -77,28 +79,38 @@ public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid /** * @param name Cache name. + * @param cacheMode Cache mode. * @return Cache configuration. */ - private CacheConfiguration configuration(String name) { + private CacheConfiguration configuration(String name, CacheMode cacheMode) { CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setName(name); ccfg.setWriteSynchronizationMode(FULL_SYNC); ccfg.setAtomicWriteOrderMode(PRIMARY); ccfg.setAtomicityMode(ATOMIC); - ccfg.setBackups(1); + ccfg.setCacheMode(cacheMode); + if (cacheMode == PARTITIONED) + ccfg.setBackups(1); return ccfg; } - private List<CacheConfiguration> caches(boolean idx) { + /** + * @param idx Use index flag. + * @param persCacheMode Person cache mode. + * @param orgCacheMode Organization cache mode. + * @param accCacheMode Account cache mode. + * @return Configurations. + */ + private List<CacheConfiguration> caches(boolean idx, + CacheMode persCacheMode, + CacheMode orgCacheMode, + CacheMode accCacheMode) { List<CacheConfiguration> ccfgs = new ArrayList<>(); { - CacheConfiguration ccfg = configuration(PERSON_CACHE); - - // One cache is replicated. - ccfg.setCacheMode(REPLICATED); + CacheConfiguration ccfg = configuration(PERSON_CACHE, persCacheMode); QueryEntity entity = new QueryEntity(); entity.setKeyType(Integer.class.getName()); @@ -115,7 +127,7 @@ public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid } { - CacheConfiguration ccfg = configuration(ORG_CACHE); + CacheConfiguration ccfg = configuration(ORG_CACHE, orgCacheMode); QueryEntity entity = new QueryEntity(); entity.setKeyType(Integer.class.getName()); @@ -131,7 +143,7 @@ public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid } { - CacheConfiguration ccfg = configuration(ACCOUNT_CACHE); + CacheConfiguration ccfg = configuration(ACCOUNT_CACHE, accCacheMode); QueryEntity entity = new QueryEntity(); entity.setKeyType(Integer.class.getName()); @@ -175,18 +187,36 @@ public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid /** * @throws Exception If failed. */ - public void testJoin() throws Exception { - join(true); + public void testJoin1() throws Exception { + join(true, REPLICATED, PARTITIONED, PARTITIONED); + } + + /** + * @throws Exception If failed. + */ + public void testJoin2() throws Exception { + join(true, PARTITIONED, REPLICATED, PARTITIONED); + } + + /** + * @throws Exception If failed. + */ + public void testJoin3() throws Exception { + join(true, PARTITIONED, PARTITIONED, REPLICATED); } /** * @param idx Use index flag. + * @param persCacheMode Person cache mode. + * @param orgCacheMode Organization cache mode. + * @param accCacheMode Account cache mode. * @throws Exception If failed. */ - private void join(boolean idx) throws Exception { + private void join(boolean idx, CacheMode persCacheMode, CacheMode orgCacheMode, CacheMode accCacheMode) + throws Exception { Ignite client = grid(2); - for (CacheConfiguration ccfg : caches(idx)) + for (CacheConfiguration ccfg : caches(idx, persCacheMode, orgCacheMode, accCacheMode)) client.createCache(ccfg); try { @@ -217,21 +247,31 @@ public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid accCache.put(keyForNode(aff, accKey, node0), new Account(pid, orgId1, "a0")); accCache.put(keyForNode(aff, accKey, node1), new Account(pid, orgId1, "a1")); + IgniteCache<Object, Object> qryCache = replicated(orgCache) ? personCache : orgCache; + checkQuery("select p._key, p.name, a.name " + "from \"person\".Person p, \"acc\".Account a " + - "where p._key = a.personId", orgCache, true, 2); + "where p._key = a.personId", qryCache, false, 2); checkQuery("select o.name, p._key, p.name, a.name " + "from \"org\".Organization o, \"person\".Person p, \"acc\".Account a " + - "where p.orgId = o._key and p._key = a.personId", orgCache, true, 2); + "where p.orgId = o._key and p._key = a.personId", qryCache, false, 2); checkQuery("select o.name, p._key, p.name, a.name " + "from \"org\".Organization o, \"acc\".Account a, \"person\".Person p " + - "where p.orgId = o._key and p._key = a.personId", orgCache, false, 2); + "where p.orgId = o._key and p._key = a.personId", qryCache, false, 2); checkQuery("select o.name, p._key, p.name, a.name " + "from \"person\".Person p, \"org\".Organization o, \"acc\".Account a " + - "where p.orgId = o._key and p._key = a.personId", orgCache, true, 2); + "where p.orgId = o._key and p._key = a.personId", qryCache, false, 2); + + checkQuery("select * from (select o.name n1, p._key, p.name n2, a.name n3 " + + "from \"acc\".Account a, \"person\".Person p, \"org\".Organization o " + + "where p.orgId = o._key and p._key = a.personId)", qryCache, false, 2); + + checkQuery("select * from (select o.name n1, p._key, p.name n2, a.name n3 " + + "from \"person\".Person p, \"acc\".Account a, \"org\".Organization o " + + "where p.orgId = o._key and p._key = a.personId)", qryCache, false, 2); String[] cacheNames = {"\"org\".Organization o", "\"person\".Person p", "\"acc\".Account a"}; @@ -254,7 +294,7 @@ public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid append(cache3).append(" "). append("where p.orgId = o._key and p._key = a.personId"); - checkQuery(qry.toString(), orgCache, false, 2); + checkQuery(qry.toString(), qryCache, false, 2); } } } @@ -302,6 +342,14 @@ public class IgniteCacheDistributedJoinPartitionedAndReplicatedTest extends Grid } /** + * @param cache Cache. + * @return {@code True} if cache is replicated. + */ + private boolean replicated(IgniteCache<?, ?> cache) { + return cache.getConfiguration(CacheConfiguration.class).getCacheMode() == REPLICATED; + } + + /** * */ private static class Account implements Serializable { http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java index 9eb577c..fa9032d 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedCollocationTest.java @@ -262,10 +262,6 @@ public class IgniteCacheJoinPartitionedAndReplicatedCollocationTest extends Abst "where p._key = a.personId and p._key=?", accCache, true, key); checkQuery("select p._key, p.name, a.name " + - "from \"person\".Person p left outer join \"acc\".Account a " + - "on (p._key = a.personId) and p._key=?", accCache, true, key); - - checkQuery("select p._key, p.name, a.name " + "from \"person\".Person p right outer join \"acc\".Account a " + "on (p._key = a.personId) and p._key=?", accCache, true, key); @@ -273,9 +269,13 @@ public class IgniteCacheJoinPartitionedAndReplicatedCollocationTest extends Abst "from \"acc\".Account a left outer join \"person\".Person p " + "on (p._key = a.personId) and p._key=?", accCache, true, key); - checkQuery("select p._key, p.name, a.name " + - "from \"acc\".Account a right outer join \"person\".Person p " + - "on (p._key = a.personId) and p._key=?", accCache, true, key); +// checkQuery("select p._key, p.name, a.name " + +// "from \"acc\".Account a right outer join \"person\".Person p " + +// "on (p._key = a.personId) and p._key=?", accCache, true, key); +// +// checkQuery("select p._key, p.name, a.name " + +// "from \"person\".Person p left outer join \"acc\".Account a " + +// "on (p._key = a.personId) and p._key=?", accCache, true, key); } } finally { http://git-wip-us.apache.org/repos/asf/ignite/blob/87464e23/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java index 0a7ea8b..f018ff8 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlSplitterSelfTest.java @@ -312,58 +312,45 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { " union select o.name n1, p.name n2 from \"org\".Organization o, \"pers\".Person2 p where p.orgId = o._key and o._key=2"; String plan = (String)c1.query(new SqlFieldsQuery("explain " + select0) - .setDistributedJoins(true).setEnforceJoinOrder(true)) + .setDistributedJoins(true)) .getAll().get(0).get(0); - X.println("Plan : " + plan); + X.println("Plan: " + plan); - assertEquals(1, StringUtils.countOccurrencesOf(plan, "batched")); - assertEquals(1, StringUtils.countOccurrencesOf(plan, "batched:broadcast")); - assertEquals(2, c1.query(new SqlFieldsQuery(select0).setDistributedJoins(true) - .setEnforceJoinOrder(true)).getAll().size()); + assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched")); + assertEquals(2, c1.query(new SqlFieldsQuery(select0).setDistributedJoins(true)).getAll().size()); String select = "select * from (" + select0 + ")"; plan = (String)c1.query(new SqlFieldsQuery("explain " + select) - .setDistributedJoins(true).setEnforceJoinOrder(true)) + .setDistributedJoins(true)) .getAll().get(0).get(0); X.println("Plan : " + plan); - assertEquals(1, StringUtils.countOccurrencesOf(plan, "batched")); - assertEquals(1, StringUtils.countOccurrencesOf(plan, "batched:broadcast")); - assertEquals(2, c1.query(new SqlFieldsQuery(select).setDistributedJoins(true) - .setEnforceJoinOrder(true)).getAll().size()); + assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched")); + assertEquals(2, c1.query(new SqlFieldsQuery(select).setDistributedJoins(true)).getAll().size()); String select1 = "select o.name n1, p.name n2 from \"pers\".Person2 p, \"org\".Organization o where p.orgId = o._key and o._key=1" + " union select * from (select o.name n1, p.name n2 from \"org\".Organization o, \"pers\".Person2 p where p.orgId = o._key and o._key=2)"; plan = (String)c1.query(new SqlFieldsQuery("explain " + select1) - .setDistributedJoins(true).setEnforceJoinOrder(true)) - .getAll().get(0).get(0); - - X.println("Plan : " + plan); + .setDistributedJoins(true)).getAll().get(0).get(0); - assertEquals(1, StringUtils.countOccurrencesOf(plan, "batched")); - assertEquals(1, StringUtils.countOccurrencesOf(plan, "batched:broadcast")); - assertEquals(2, c1.query(new SqlFieldsQuery(select).setDistributedJoins(true) - .setEnforceJoinOrder(true)).getAll().size()); + X.println("Plan: " + plan); - assertEquals(2, c1.query(new SqlFieldsQuery(select).setDistributedJoins(true) - .setEnforceJoinOrder(true)).getAll().size()); + assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched")); + assertEquals(2, c1.query(new SqlFieldsQuery(select).setDistributedJoins(true)).getAll().size()); select = "select * from (" + select1 + ")"; plan = (String)c1.query(new SqlFieldsQuery("explain " + select) - .setDistributedJoins(true).setEnforceJoinOrder(true)) - .getAll().get(0).get(0); + .setDistributedJoins(true)).getAll().get(0).get(0); X.println("Plan : " + plan); - assertEquals(1, StringUtils.countOccurrencesOf(plan, "batched")); - assertEquals(1, StringUtils.countOccurrencesOf(plan, "batched:broadcast")); - assertEquals(2, c1.query(new SqlFieldsQuery(select).setDistributedJoins(true) - .setEnforceJoinOrder(true)).getAll().size()); + assertEquals(0, StringUtils.countOccurrencesOf(plan, "batched")); + assertEquals(2, c1.query(new SqlFieldsQuery(select).setDistributedJoins(true)).getAll().size()); } finally { c1.destroy(); @@ -398,6 +385,104 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { caches.add(orgRepl); try { + // Join multiple. + + { + String sql = "select p1._key k1, p2._key k2, o._key k3 " + + "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, \"orgPart\".Organization o " + + "where p1.affKey=p2._key and p2.orgId = o._key"; + + checkQueryPlanContains(persPart, + true, + 2, + sql, + "batched:unicast", "batched:unicast"); + + checkQueryPlanContains(persPart, + false, + 2, + sql, + "batched:unicast", "batched:unicast"); + } + + { + String sql = "select p1._key k1, p2._key k2, o._key k3 " + + "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, \"orgPart\".Organization o " + + "where p1.affKey > p2._key and p2.orgId = o._key"; + + checkQueryPlanContains(persPart, + true, + 2, + sql, + "batched:broadcast", "batched:unicast"); + + checkQueryPlanContains(persPart, + false, + 2, + sql, + "batched:broadcast", "batched:unicast"); + } + + { + // First join is collocated, second is replicated. + + String sql = "select p1._key k1, p2._key k2, o._key k3 " + + "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, \"orgRepl\".Organization o " + + "where p1.affKey=p2._key and p2.orgId = o._key"; + + checkQueryPlanContains(persPart, + true, + 0, + sql); + + checkQueryPlanContains(persPart, + false, + 0, + sql); + } + + { + String sql = "select p1._key k1, p2._key k2, o._key k3 " + + "from \"persPartAff\".Person2 p1, \"persPart\".Person2 p2, \"orgRepl\".Organization o " + + "where p1._key=p2._key and p2.orgId = o._key"; + + checkQueryPlanContains(persPart, + false, + 1, + sql, + "batched:unicast"); + + sql = "select p1._key k1, p2._key k2, o._key k3 " + + "from \"orgRepl\".Organization o, \"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " + + "where p1._key=p2._key and p2.orgId = o._key"; + + checkQueryPlanContains(persPart, + false, + 1, + sql, + "batched:unicast"); + + sql = "select p1._key k1, p2._key k2, o._key k3 " + + "from \"persPartAff\".Person2 p1, \"orgRepl\".Organization o, \"persPart\".Person2 p2 " + + "where p1._key=p2._key and p2.orgId = o._key"; + + checkQueryPlanContains(persPart, + false, + 1, + sql, + "batched:unicast"); + + sql = "select p1._key k1, p2._key k2, o._key k3 " + + "from (select * from \"orgRepl\".Organization) o, \"persPartAff\".Person2 p1, \"persPart\".Person2 p2 " + + "where p1._key=p2._key and p2.orgId = o._key"; + + checkQueryPlanContains(persPart, + false, + 1, + sql, + "batched:unicast"); + } + // Join two partitioned. checkQueryPlanContains(persPart, @@ -476,6 +561,20 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { false, 0, "select p._key k1, o._key k2 " + + "from \"persPart\".Person2 p, (select * from \"orgRepl\".Organization) o " + + "where p.orgId = o._key"); + + checkQueryPlanContains(persPart, + false, + 0, + "select p._key k1, o._key k2 " + + "from (select * from \"orgRepl\".Organization) o, \"persPart\".Person2 p " + + "where p.orgId = o._key"); + + checkQueryPlanContains(persPart, + false, + 0, + "select p._key k1, o._key k2 " + "from \"persPart\".Person2 p inner join \"orgRepl\".Organization o " + "on p.orgId = o._key"); @@ -487,28 +586,26 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { "on p.orgId = o._key"); checkQueryPlanContains(persPart, - true, - 1, + false, + 0, "select p._key k1, o._key k2 " + "from \"orgRepl\".Organization o, \"persPart\".Person2 p " + - "where p.orgId = o._key", - "batched:broadcast"); + "where p.orgId = o._key"); checkQueryPlanContains(persPart, - true, - 1, + false, + 0, "select p._key k1, o._key k2 " + "from \"orgRepl\".Organization o inner join \"persPart\".Person2 p " + - "on p.orgId = o._key", - "batched:broadcast"); + "on p.orgId = o._key"); - checkQueryPlanContains(persPart, - true, - 1, - "select p._key k1, o._key k2 " + - "from \"orgRepl\".Organization o left outer join \"persPart\".Person2 p " + - "on p.orgId = o._key", - "batched:broadcast"); +// checkQueryPlanContains(persPart, +// true, +// 1, +// "select p._key k1, o._key k2 " + +// "from \"orgRepl\".Organization o left outer join \"persPart\".Person2 p " + +// "on p.orgId = o._key", +// "batched:broadcast"); // Join on affinity keys. @@ -623,7 +720,7 @@ public class IgniteSqlSplitterSelfTest extends GridCommonAbstractTest { int startIdx = 0; for (String exp : expText) { - int idx = exp.indexOf(exp, startIdx); + int idx = plan.indexOf(exp, startIdx); if (idx == -1) { fail("Plan does not contain expected string [startIdx=" + startIdx +
